Skip to main content

lash_core/runtime/
causal.rs

1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5    CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6    RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10    session_id: &str,
11    turn_id: &str,
12    turn_index: usize,
13    protocol_iteration: usize,
14    effect_id: EffectId,
15    effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17    RuntimeInvocation::effect(
18        RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19        effect_id.0.to_string(),
20        effect_kind,
21        turn_effect_replay_key(
22            session_id,
23            turn_id,
24            turn_index,
25            protocol_iteration,
26            effect_kind,
27            effect_id,
28        ),
29    )
30}
31
32fn turn_effect_replay_key(
33    session_id: &str,
34    turn_id: &str,
35    turn_index: usize,
36    protocol_iteration: usize,
37    kind: RuntimeEffectKind,
38    effect_id: EffectId,
39) -> String {
40    format!(
41        "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42        kind.as_str(),
43        effect_id.0
44    )
45}
46
47pub(crate) fn child_effect_invocation(
48    parent: &RuntimeInvocation,
49    effect_id: impl Into<String>,
50    kind: RuntimeEffectKind,
51    replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53    let replay_base = parent
54        .replay_key()
55        .or_else(|| parent.effect_id())
56        .unwrap_or("effect");
57    RuntimeInvocation {
58        scope: parent.scope.clone(),
59        subject: RuntimeSubject::Effect {
60            effect_id: effect_id.into(),
61            kind,
62        },
63        caused_by: parent.causal_ref(),
64        replay: Some(RuntimeReplay {
65            key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66        }),
67    }
68}
69
70pub(crate) fn child_tool_effect_invocation(
71    parent: &RuntimeInvocation,
72    parent_effect_id: EffectId,
73    call_id: &str,
74) -> RuntimeInvocation {
75    child_effect_invocation(
76        parent,
77        format!("{}:{call_id}", parent_effect_id.0),
78        RuntimeEffectKind::ToolCall,
79        call_id,
80    )
81}
82
83pub(crate) fn tool_retry_sleep_invocation(
84    parent: &RuntimeInvocation,
85    tool_name: &str,
86    attempt: u32,
87) -> RuntimeInvocation {
88    let parent_effect_id = parent.effect_id().unwrap_or("effect");
89    child_effect_invocation(
90        parent,
91        format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
92        RuntimeEffectKind::Sleep,
93        format!("{tool_name}:attempt:{attempt}:sleep"),
94    )
95}
96
97pub(crate) fn lashlang_sleep_invocation(
98    session_id: &str,
99    parent: Option<&RuntimeInvocation>,
100    scope: &str,
101    sequence: u64,
102) -> RuntimeInvocation {
103    let suffix = format!("lashlang:{scope}:sleep:{sequence}");
104    if let Some(parent) = parent {
105        let parent_effect_id = parent.effect_id().unwrap_or("effect");
106        return child_effect_invocation(
107            parent,
108            format!("{parent_effect_id}:{suffix}"),
109            RuntimeEffectKind::Sleep,
110            suffix,
111        );
112    }
113    RuntimeInvocation::effect(
114        RuntimeScope::new(session_id),
115        suffix.clone(),
116        RuntimeEffectKind::Sleep,
117        suffix,
118    )
119}
120
121pub(crate) fn lashlang_await_event_invocation(
122    session_id: &str,
123    parent: Option<&RuntimeInvocation>,
124    process_id: &str,
125    signal_name: &str,
126    ordinal: u64,
127) -> RuntimeInvocation {
128    let suffix = format!("lashlang:process:{process_id}:signal.{signal_name}:await:{ordinal}");
129    if let Some(parent) = parent {
130        let parent_effect_id = parent.effect_id().unwrap_or("effect");
131        return child_effect_invocation(
132            parent,
133            format!("{parent_effect_id}:{suffix}"),
134            RuntimeEffectKind::AwaitEvent,
135            suffix,
136        );
137    }
138    RuntimeInvocation::effect(
139        RuntimeScope::new(session_id),
140        suffix.clone(),
141        RuntimeEffectKind::AwaitEvent,
142        suffix,
143    )
144}
145
146pub(crate) fn process_effect_invocation(
147    session_id: &str,
148    parent: Option<RuntimeInvocation>,
149    effect_id: &str,
150) -> RuntimeInvocation {
151    if let Some(parent) = parent {
152        let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
153            RuntimeScope {
154                session_id: session_id.to_string(),
155                turn_id: Some(turn_id),
156                turn_index: parent.scope.turn_index,
157                protocol_iteration: parent.scope.protocol_iteration,
158            }
159        } else {
160            RuntimeScope::new(session_id)
161        };
162        let replay_base = parent.replay_key().unwrap_or("process");
163        return RuntimeInvocation {
164            scope,
165            subject: RuntimeSubject::Effect {
166                effect_id: effect_id.to_string(),
167                kind: RuntimeEffectKind::Process,
168            },
169            caused_by: parent.causal_ref(),
170            replay: Some(RuntimeReplay {
171                key: format!("{replay_base}:{effect_id}"),
172            }),
173        };
174    }
175    RuntimeInvocation::effect(
176        RuntimeScope::new(session_id),
177        effect_id.to_string(),
178        RuntimeEffectKind::Process,
179        format!("{session_id}:{effect_id}"),
180    )
181}
182
183pub fn process_event_invocation(
184    process_id: &str,
185    sequence: u64,
186    event_type: &str,
187    replay: Option<RuntimeReplay>,
188) -> RuntimeInvocation {
189    RuntimeInvocation {
190        scope: RuntimeScope::new("runtime"),
191        subject: RuntimeSubject::ProcessEvent {
192            process_id: process_id.to_string(),
193            sequence,
194            event_type: event_type.to_string(),
195        },
196        caused_by: Some(CausalRef::Process {
197            process_id: process_id.to_string(),
198        }),
199        replay,
200    }
201}
202
203pub(crate) fn trigger_occurrence_invocation(
204    session_id: &str,
205    occurrence_id: &str,
206) -> RuntimeInvocation {
207    RuntimeInvocation {
208        scope: RuntimeScope::new(session_id),
209        subject: RuntimeSubject::TriggerOccurrence {
210            occurrence_id: occurrence_id.to_string(),
211        },
212        caused_by: None,
213        replay: Some(RuntimeReplay {
214            key: format!("trigger:{occurrence_id}"),
215        }),
216    }
217}
218
219pub(crate) fn direct_effect_invocation(
220    session_id: &str,
221    usage_source: &str,
222    replay_discriminator: String,
223    turn_id: Option<&str>,
224    caused_by: Option<CausalRef>,
225) -> RuntimeInvocation {
226    let replay_key = match turn_id.filter(|value| !value.is_empty()) {
227        Some(turn_id) => {
228            format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
229        }
230        None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
231    };
232    RuntimeInvocation::effect(
233        RuntimeScope {
234            session_id: session_id.to_string(),
235            turn_id: turn_id.map(str::to_string),
236            turn_index: None,
237            protocol_iteration: None,
238        },
239        replay_discriminator,
240        RuntimeEffectKind::Direct,
241        replay_key,
242    )
243    .with_caused_by(caused_by)
244}
245
246pub(crate) fn direct_request_discriminator<T>(
247    request: &T,
248    explicit_replay: Option<&RuntimeReplay>,
249    caused_by: Option<&CausalRef>,
250) -> Result<String, RuntimeEffectControllerError>
251where
252    T: Serialize,
253{
254    let cause_discriminator = caused_by
255        .map(causal_replay_discriminator)
256        .unwrap_or_default();
257    if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
258        return Ok(format!("{cause_discriminator}request:{}", replay.key));
259    }
260    let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
261        RuntimeEffectControllerError::new(
262            "runtime_effect_discriminator",
263            format!("failed to serialize runtime effect discriminator: {err}"),
264        )
265    })?;
266    Ok(format!("{cause_discriminator}sha256:{digest}"))
267}
268
269fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
270    match caused_by {
271        CausalRef::Turn {
272            session_id,
273            turn_id,
274        } => format!("cause:turn:{session_id}:{turn_id}:"),
275        CausalRef::Effect {
276            session_id,
277            turn_id,
278            effect_id,
279        } => {
280            let turn = turn_id.as_deref().unwrap_or("");
281            format!("cause:effect:{session_id}:{turn}:{effect_id}:")
282        }
283        CausalRef::ToolCall {
284            session_id,
285            call_id,
286        } => format!("cause:tool_call:{session_id}:{call_id}:"),
287        CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
288        CausalRef::ProcessEvent {
289            process_id,
290            sequence,
291        } => format!("cause:process_event:{process_id}:{sequence}:"),
292        CausalRef::TriggerOccurrence { occurrence_id } => format!("cause:trigger:{occurrence_id}:"),
293        CausalRef::SessionNode {
294            session_id,
295            node_id,
296        } => format!("cause:session_node:{session_id}:{node_id}:"),
297    }
298}