Skip to main content

lash_core/runtime/
causal.rs

1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5    CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6    RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10    session_id: &str,
11    turn_id: &str,
12    turn_index: usize,
13    protocol_iteration: usize,
14    effect_id: EffectId,
15    effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17    RuntimeInvocation::effect(
18        RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19        effect_id.0.to_string(),
20        effect_kind,
21        turn_effect_replay_key(
22            session_id,
23            turn_id,
24            turn_index,
25            protocol_iteration,
26            effect_kind,
27            effect_id,
28        ),
29    )
30}
31
32fn turn_effect_replay_key(
33    session_id: &str,
34    turn_id: &str,
35    turn_index: usize,
36    protocol_iteration: usize,
37    kind: RuntimeEffectKind,
38    effect_id: EffectId,
39) -> String {
40    format!(
41        "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42        kind.as_str(),
43        effect_id.0
44    )
45}
46
47pub(crate) fn child_effect_invocation(
48    parent: &RuntimeInvocation,
49    effect_id: impl Into<String>,
50    kind: RuntimeEffectKind,
51    replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53    let replay_base = parent
54        .replay_key()
55        .or_else(|| parent.effect_id())
56        .unwrap_or("effect");
57    RuntimeInvocation {
58        scope: parent.scope.clone(),
59        subject: RuntimeSubject::Effect {
60            effect_id: effect_id.into(),
61            kind,
62        },
63        caused_by: parent.causal_ref(),
64        replay: Some(RuntimeReplay {
65            key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66        }),
67    }
68}
69
70pub(crate) fn child_tool_effect_invocation(
71    parent: &RuntimeInvocation,
72    parent_effect_id: EffectId,
73    call_id: &str,
74) -> RuntimeInvocation {
75    child_effect_invocation(
76        parent,
77        format!("{}:{call_id}", parent_effect_id.0),
78        RuntimeEffectKind::ToolCall,
79        call_id,
80    )
81}
82
83pub(crate) fn tool_retry_sleep_invocation(
84    parent: &RuntimeInvocation,
85    tool_name: &str,
86    attempt: u32,
87) -> RuntimeInvocation {
88    let parent_effect_id = parent.effect_id().unwrap_or("effect");
89    child_effect_invocation(
90        parent,
91        format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
92        RuntimeEffectKind::Sleep,
93        format!("{tool_name}:attempt:{attempt}:sleep"),
94    )
95}
96
97pub(crate) fn lashlang_sleep_invocation(
98    session_id: &str,
99    parent: Option<&RuntimeInvocation>,
100    scope: &str,
101    sequence: u64,
102) -> RuntimeInvocation {
103    let suffix = format!("lashlang:{scope}:sleep:{sequence}");
104    if let Some(parent) = parent {
105        let parent_effect_id = parent.effect_id().unwrap_or("effect");
106        return child_effect_invocation(
107            parent,
108            format!("{parent_effect_id}:{suffix}"),
109            RuntimeEffectKind::Sleep,
110            suffix,
111        );
112    }
113    RuntimeInvocation::effect(
114        RuntimeScope::new(session_id),
115        suffix.clone(),
116        RuntimeEffectKind::Sleep,
117        suffix,
118    )
119}
120
121pub(crate) fn lashlang_await_event_invocation(
122    session_id: &str,
123    parent: Option<&RuntimeInvocation>,
124    process_id: &str,
125    signal_name: &str,
126    ordinal: u64,
127) -> RuntimeInvocation {
128    let suffix = format!("lashlang:process:{process_id}:signal.{signal_name}:await:{ordinal}");
129    if let Some(parent) = parent {
130        let parent_effect_id = parent.effect_id().unwrap_or("effect");
131        return child_effect_invocation(
132            parent,
133            format!("{parent_effect_id}:{suffix}"),
134            RuntimeEffectKind::AwaitEvent,
135            suffix,
136        );
137    }
138    RuntimeInvocation::effect(
139        RuntimeScope::new(session_id),
140        suffix.clone(),
141        RuntimeEffectKind::AwaitEvent,
142        suffix,
143    )
144}
145
146pub(crate) fn process_effect_invocation(
147    session_id: &str,
148    parent: Option<RuntimeInvocation>,
149    effect_id: &str,
150) -> RuntimeInvocation {
151    if let Some(parent) = parent {
152        let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
153            RuntimeScope {
154                session_id: session_id.to_string(),
155                turn_id: Some(turn_id),
156                turn_index: parent.scope.turn_index,
157                protocol_iteration: parent.scope.protocol_iteration,
158            }
159        } else {
160            RuntimeScope::new(session_id)
161        };
162        let replay_base = parent.replay_key().unwrap_or("process");
163        return RuntimeInvocation {
164            scope,
165            subject: RuntimeSubject::Effect {
166                effect_id: effect_id.to_string(),
167                kind: RuntimeEffectKind::Process,
168            },
169            caused_by: parent.causal_ref(),
170            replay: Some(RuntimeReplay {
171                key: format!("{replay_base}:{effect_id}"),
172            }),
173        };
174    }
175    RuntimeInvocation::effect(
176        RuntimeScope::new(session_id),
177        effect_id.to_string(),
178        RuntimeEffectKind::Process,
179        format!("{session_id}:{effect_id}"),
180    )
181}
182
183pub fn process_event_invocation(
184    process_id: &str,
185    sequence: u64,
186    event_type: &str,
187    replay: Option<RuntimeReplay>,
188) -> RuntimeInvocation {
189    RuntimeInvocation {
190        scope: RuntimeScope::new("runtime"),
191        subject: RuntimeSubject::ProcessEvent {
192            process_id: process_id.to_string(),
193            sequence,
194            event_type: event_type.to_string(),
195        },
196        caused_by: Some(CausalRef::Process {
197            process_id: process_id.to_string(),
198        }),
199        replay,
200    }
201}
202
203pub(crate) fn host_event_invocation(session_id: &str, occurrence_id: &str) -> RuntimeInvocation {
204    RuntimeInvocation {
205        scope: RuntimeScope::new(session_id),
206        subject: RuntimeSubject::HostEvent {
207            occurrence_id: occurrence_id.to_string(),
208        },
209        caused_by: None,
210        replay: Some(RuntimeReplay {
211            key: format!("host_event:{occurrence_id}"),
212        }),
213    }
214}
215
216pub(crate) fn direct_effect_invocation(
217    session_id: &str,
218    usage_source: &str,
219    replay_discriminator: String,
220    turn_id: Option<&str>,
221    caused_by: Option<CausalRef>,
222) -> RuntimeInvocation {
223    let replay_key = match turn_id.filter(|value| !value.is_empty()) {
224        Some(turn_id) => {
225            format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
226        }
227        None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
228    };
229    RuntimeInvocation::effect(
230        RuntimeScope {
231            session_id: session_id.to_string(),
232            turn_id: turn_id.map(str::to_string),
233            turn_index: None,
234            protocol_iteration: None,
235        },
236        replay_discriminator,
237        RuntimeEffectKind::Direct,
238        replay_key,
239    )
240    .with_caused_by(caused_by)
241}
242
243pub(crate) fn direct_request_discriminator<T>(
244    request: &T,
245    explicit_replay: Option<&RuntimeReplay>,
246    caused_by: Option<&CausalRef>,
247) -> Result<String, RuntimeEffectControllerError>
248where
249    T: Serialize,
250{
251    let cause_discriminator = caused_by
252        .map(causal_replay_discriminator)
253        .unwrap_or_default();
254    if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
255        return Ok(format!("{cause_discriminator}request:{}", replay.key));
256    }
257    let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
258        RuntimeEffectControllerError::new(
259            "runtime_effect_discriminator",
260            format!("failed to serialize runtime effect discriminator: {err}"),
261        )
262    })?;
263    Ok(format!("{cause_discriminator}sha256:{digest}"))
264}
265
266fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
267    match caused_by {
268        CausalRef::Turn {
269            session_id,
270            turn_id,
271        } => format!("cause:turn:{session_id}:{turn_id}:"),
272        CausalRef::Effect {
273            session_id,
274            turn_id,
275            effect_id,
276        } => {
277            let turn = turn_id.as_deref().unwrap_or("");
278            format!("cause:effect:{session_id}:{turn}:{effect_id}:")
279        }
280        CausalRef::ToolCall {
281            session_id,
282            call_id,
283        } => format!("cause:tool_call:{session_id}:{call_id}:"),
284        CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
285        CausalRef::ProcessEvent {
286            process_id,
287            sequence,
288        } => format!("cause:process_event:{process_id}:{sequence}:"),
289        CausalRef::HostEvent { occurrence_id } => format!("cause:host_event:{occurrence_id}:"),
290        CausalRef::SessionNode {
291            session_id,
292            node_id,
293        } => format!("cause:session_node:{session_id}:{node_id}:"),
294    }
295}