Skip to main content

lash_core/runtime/
causal.rs

1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5    CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6    RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10    session_id: &str,
11    turn_id: &str,
12    turn_index: usize,
13    protocol_iteration: usize,
14    effect_id: EffectId,
15    effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17    RuntimeInvocation::effect(
18        RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19        effect_id.0.to_string(),
20        effect_kind,
21        turn_effect_replay_key(
22            session_id,
23            turn_id,
24            turn_index,
25            protocol_iteration,
26            effect_kind,
27            effect_id,
28        ),
29    )
30}
31
32fn turn_effect_replay_key(
33    session_id: &str,
34    turn_id: &str,
35    turn_index: usize,
36    protocol_iteration: usize,
37    kind: RuntimeEffectKind,
38    effect_id: EffectId,
39) -> String {
40    format!(
41        "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42        kind.as_str(),
43        effect_id.0
44    )
45}
46
47pub(crate) fn child_effect_invocation(
48    parent: &RuntimeInvocation,
49    effect_id: impl Into<String>,
50    kind: RuntimeEffectKind,
51    replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53    let replay_base = parent
54        .replay_key()
55        .or_else(|| parent.effect_id())
56        .unwrap_or("effect");
57    RuntimeInvocation {
58        scope: parent.scope.clone(),
59        subject: RuntimeSubject::Effect {
60            effect_id: effect_id.into(),
61            kind,
62        },
63        caused_by: parent.causal_ref(),
64        replay: Some(RuntimeReplay {
65            key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66        }),
67    }
68}
69
70pub(crate) fn tool_retry_sleep_invocation(
71    parent: &RuntimeInvocation,
72    tool_name: &str,
73    attempt: u32,
74) -> RuntimeInvocation {
75    let parent_effect_id = parent.effect_id().unwrap_or("effect");
76    child_effect_invocation(
77        parent,
78        format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
79        RuntimeEffectKind::Sleep,
80        format!("{tool_name}:attempt:{attempt}:sleep"),
81    )
82}
83
84pub(crate) fn process_sleep_invocation(
85    session_id: &str,
86    parent: Option<&RuntimeInvocation>,
87    scope: &str,
88    sequence: u64,
89) -> RuntimeInvocation {
90    let suffix = format!("process:{scope}:sleep:{sequence}");
91    if let Some(parent) = parent {
92        let parent_effect_id = parent.effect_id().unwrap_or("effect");
93        return child_effect_invocation(
94            parent,
95            format!("{parent_effect_id}:{suffix}"),
96            RuntimeEffectKind::Sleep,
97            suffix,
98        );
99    }
100    RuntimeInvocation::effect(
101        RuntimeScope::new(session_id),
102        suffix.clone(),
103        RuntimeEffectKind::Sleep,
104        suffix,
105    )
106}
107
108pub(crate) fn process_await_event_invocation(
109    session_id: &str,
110    parent: Option<&RuntimeInvocation>,
111    process_id: &str,
112    signal_name: &str,
113    ordinal: u64,
114) -> RuntimeInvocation {
115    let suffix = format!("process:{process_id}:signal.{signal_name}:await:{ordinal}");
116    if let Some(parent) = parent {
117        let parent_effect_id = parent.effect_id().unwrap_or("effect");
118        return child_effect_invocation(
119            parent,
120            format!("{parent_effect_id}:{suffix}"),
121            RuntimeEffectKind::AwaitEvent,
122            suffix,
123        );
124    }
125    RuntimeInvocation::effect(
126        RuntimeScope::new(session_id),
127        suffix.clone(),
128        RuntimeEffectKind::AwaitEvent,
129        suffix,
130    )
131}
132
133pub(crate) fn process_effect_invocation(
134    session_id: &str,
135    parent: Option<RuntimeInvocation>,
136    effect_id: &str,
137) -> RuntimeInvocation {
138    if let Some(parent) = parent {
139        let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
140            RuntimeScope {
141                session_id: session_id.to_string(),
142                turn_id: Some(turn_id),
143                turn_index: parent.scope.turn_index,
144                protocol_iteration: parent.scope.protocol_iteration,
145            }
146        } else {
147            RuntimeScope::new(session_id)
148        };
149        let replay_base = parent.replay_key().unwrap_or("process");
150        return RuntimeInvocation {
151            scope,
152            subject: RuntimeSubject::Effect {
153                effect_id: effect_id.to_string(),
154                kind: RuntimeEffectKind::Process,
155            },
156            caused_by: parent.causal_ref(),
157            replay: Some(RuntimeReplay {
158                key: format!("{replay_base}:{effect_id}"),
159            }),
160        };
161    }
162    RuntimeInvocation::effect(
163        RuntimeScope::new(session_id),
164        effect_id.to_string(),
165        RuntimeEffectKind::Process,
166        format!("{session_id}:{effect_id}"),
167    )
168}
169
170pub fn process_event_invocation(
171    process_id: &str,
172    sequence: u64,
173    event_type: &str,
174    replay: Option<RuntimeReplay>,
175) -> RuntimeInvocation {
176    RuntimeInvocation {
177        scope: RuntimeScope::new("runtime"),
178        subject: RuntimeSubject::ProcessEvent {
179            process_id: process_id.to_string(),
180            sequence,
181            event_type: event_type.to_string(),
182        },
183        caused_by: Some(CausalRef::Process {
184            process_id: process_id.to_string(),
185        }),
186        replay,
187    }
188}
189
190pub(crate) fn trigger_occurrence_invocation(
191    session_id: &str,
192    occurrence_id: &str,
193) -> RuntimeInvocation {
194    RuntimeInvocation {
195        scope: RuntimeScope::new(session_id),
196        subject: RuntimeSubject::TriggerOccurrence {
197            occurrence_id: occurrence_id.to_string(),
198        },
199        caused_by: None,
200        replay: Some(RuntimeReplay {
201            key: format!("trigger:{occurrence_id}"),
202        }),
203    }
204}
205
206pub(crate) fn direct_effect_invocation(
207    session_id: &str,
208    usage_source: &str,
209    replay_discriminator: String,
210    turn_id: Option<&str>,
211    caused_by: Option<CausalRef>,
212) -> RuntimeInvocation {
213    let replay_key = match turn_id.filter(|value| !value.is_empty()) {
214        Some(turn_id) => {
215            format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
216        }
217        None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
218    };
219    RuntimeInvocation::effect(
220        RuntimeScope {
221            session_id: session_id.to_string(),
222            turn_id: turn_id.map(str::to_string),
223            turn_index: None,
224            protocol_iteration: None,
225        },
226        replay_discriminator,
227        RuntimeEffectKind::Direct,
228        replay_key,
229    )
230    .with_caused_by(caused_by)
231}
232
233pub(crate) fn direct_request_discriminator<T>(
234    request: &T,
235    explicit_replay: Option<&RuntimeReplay>,
236    caused_by: Option<&CausalRef>,
237) -> Result<String, RuntimeEffectControllerError>
238where
239    T: Serialize,
240{
241    let cause_discriminator = caused_by
242        .map(causal_replay_discriminator)
243        .unwrap_or_default();
244    if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
245        return Ok(format!("{cause_discriminator}request:{}", replay.key));
246    }
247    let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
248        RuntimeEffectControllerError::new(
249            "runtime_effect_discriminator",
250            format!("failed to serialize runtime effect discriminator: {err}"),
251        )
252    })?;
253    Ok(format!("{cause_discriminator}sha256:{digest}"))
254}
255
256fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
257    match caused_by {
258        CausalRef::Turn {
259            session_id,
260            turn_id,
261        } => format!("cause:turn:{session_id}:{turn_id}:"),
262        CausalRef::Effect {
263            session_id,
264            turn_id,
265            effect_id,
266        } => {
267            let turn = turn_id.as_deref().unwrap_or("");
268            format!("cause:effect:{session_id}:{turn}:{effect_id}:")
269        }
270        CausalRef::ToolCall {
271            session_id,
272            call_id,
273        } => format!("cause:tool_call:{session_id}:{call_id}:"),
274        CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
275        CausalRef::ProcessEvent {
276            process_id,
277            sequence,
278        } => format!("cause:process_event:{process_id}:{sequence}:"),
279        CausalRef::TriggerOccurrence { occurrence_id } => format!("cause:trigger:{occurrence_id}:"),
280        CausalRef::SessionNode {
281            session_id,
282            node_id,
283        } => format!("cause:session_node:{session_id}:{node_id}:"),
284    }
285}