Skip to main content

lash_core/runtime/
causal.rs

1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5    CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6    RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10    session_id: &str,
11    turn_id: &str,
12    turn_index: usize,
13    protocol_iteration: usize,
14    effect_id: EffectId,
15    effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17    RuntimeInvocation::effect(
18        RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19        effect_id.0.to_string(),
20        effect_kind,
21        turn_effect_replay_key(
22            session_id,
23            turn_id,
24            turn_index,
25            protocol_iteration,
26            effect_kind,
27            effect_id,
28        ),
29    )
30}
31
32fn turn_effect_replay_key(
33    session_id: &str,
34    turn_id: &str,
35    turn_index: usize,
36    protocol_iteration: usize,
37    kind: RuntimeEffectKind,
38    effect_id: EffectId,
39) -> String {
40    format!(
41        "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42        kind.as_str(),
43        effect_id.0
44    )
45}
46
47pub(crate) fn child_effect_invocation(
48    parent: &RuntimeInvocation,
49    effect_id: impl Into<String>,
50    kind: RuntimeEffectKind,
51    replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53    let replay_base = parent
54        .replay_key()
55        .or_else(|| parent.effect_id())
56        .unwrap_or("effect");
57    RuntimeInvocation {
58        scope: parent.scope.clone(),
59        subject: RuntimeSubject::Effect {
60            effect_id: effect_id.into(),
61            kind,
62        },
63        caused_by: parent.causal_ref(),
64        replay: Some(RuntimeReplay {
65            key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66        }),
67    }
68}
69
70pub(crate) fn child_tool_effect_invocation(
71    parent: &RuntimeInvocation,
72    parent_effect_id: EffectId,
73    call_id: &str,
74) -> RuntimeInvocation {
75    child_effect_invocation(
76        parent,
77        format!("{}:{call_id}", parent_effect_id.0),
78        RuntimeEffectKind::ToolCall,
79        call_id,
80    )
81}
82
83pub(crate) fn tool_retry_sleep_invocation(
84    parent: &RuntimeInvocation,
85    tool_name: &str,
86    attempt: u32,
87) -> RuntimeInvocation {
88    let parent_effect_id = parent.effect_id().unwrap_or("effect");
89    child_effect_invocation(
90        parent,
91        format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
92        RuntimeEffectKind::Sleep,
93        format!("{tool_name}:attempt:{attempt}:sleep"),
94    )
95}
96
97pub(crate) fn lashlang_sleep_invocation(
98    session_id: &str,
99    parent: Option<&RuntimeInvocation>,
100    scope: &str,
101    sequence: u64,
102) -> RuntimeInvocation {
103    let suffix = format!("lashlang:{scope}:sleep:{sequence}");
104    if let Some(parent) = parent {
105        let parent_effect_id = parent.effect_id().unwrap_or("effect");
106        return child_effect_invocation(
107            parent,
108            format!("{parent_effect_id}:{suffix}"),
109            RuntimeEffectKind::Sleep,
110            suffix,
111        );
112    }
113    RuntimeInvocation::effect(
114        RuntimeScope::new(session_id),
115        suffix.clone(),
116        RuntimeEffectKind::Sleep,
117        suffix,
118    )
119}
120
121pub(crate) fn process_effect_invocation(
122    session_id: &str,
123    parent: Option<RuntimeInvocation>,
124    effect_id: &str,
125) -> RuntimeInvocation {
126    if let Some(parent) = parent {
127        let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
128            RuntimeScope {
129                session_id: session_id.to_string(),
130                turn_id: Some(turn_id),
131                turn_index: parent.scope.turn_index,
132                protocol_iteration: parent.scope.protocol_iteration,
133            }
134        } else {
135            RuntimeScope::new(session_id)
136        };
137        let replay_base = parent.replay_key().unwrap_or("process");
138        return RuntimeInvocation {
139            scope,
140            subject: RuntimeSubject::Effect {
141                effect_id: effect_id.to_string(),
142                kind: RuntimeEffectKind::Process,
143            },
144            caused_by: parent.causal_ref(),
145            replay: Some(RuntimeReplay {
146                key: format!("{replay_base}:{effect_id}"),
147            }),
148        };
149    }
150    RuntimeInvocation::effect(
151        RuntimeScope::new(session_id),
152        effect_id.to_string(),
153        RuntimeEffectKind::Process,
154        format!("{session_id}:{effect_id}"),
155    )
156}
157
158pub fn process_event_invocation(
159    owner_session_id: &str,
160    process_id: &str,
161    sequence: u64,
162    event_type: &str,
163    replay: Option<RuntimeReplay>,
164) -> RuntimeInvocation {
165    RuntimeInvocation {
166        scope: RuntimeScope::new(owner_session_id),
167        subject: RuntimeSubject::ProcessEvent {
168            process_id: process_id.to_string(),
169            sequence,
170            event_type: event_type.to_string(),
171        },
172        caused_by: Some(CausalRef::Process {
173            process_id: process_id.to_string(),
174        }),
175        replay,
176    }
177}
178
179pub(crate) fn session_node_invocation(
180    session_id: &str,
181    node_id: impl Into<String>,
182) -> RuntimeInvocation {
183    RuntimeInvocation {
184        scope: RuntimeScope::new(session_id),
185        subject: RuntimeSubject::SessionNode {
186            node_id: node_id.into(),
187        },
188        caused_by: None,
189        replay: None,
190    }
191}
192
193pub(crate) fn direct_effect_invocation(
194    session_id: &str,
195    usage_source: &str,
196    replay_discriminator: String,
197    turn_id: Option<&str>,
198    caused_by: Option<CausalRef>,
199) -> RuntimeInvocation {
200    let replay_key = match turn_id.filter(|value| !value.is_empty()) {
201        Some(turn_id) => {
202            format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
203        }
204        None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
205    };
206    RuntimeInvocation::effect(
207        RuntimeScope {
208            session_id: session_id.to_string(),
209            turn_id: turn_id.map(str::to_string),
210            turn_index: None,
211            protocol_iteration: None,
212        },
213        replay_discriminator,
214        RuntimeEffectKind::Direct,
215        replay_key,
216    )
217    .with_caused_by(caused_by)
218}
219
220pub(crate) fn direct_request_discriminator<T>(
221    request: &T,
222    explicit_replay: Option<&RuntimeReplay>,
223    caused_by: Option<&CausalRef>,
224) -> Result<String, RuntimeEffectControllerError>
225where
226    T: Serialize,
227{
228    let cause_discriminator = caused_by
229        .map(causal_replay_discriminator)
230        .unwrap_or_default();
231    if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
232        return Ok(format!("{cause_discriminator}request:{}", replay.key));
233    }
234    let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
235        RuntimeEffectControllerError::new(
236            "runtime_effect_discriminator",
237            format!("failed to serialize runtime effect discriminator: {err}"),
238        )
239    })?;
240    Ok(format!("{cause_discriminator}sha256:{digest}"))
241}
242
243fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
244    match caused_by {
245        CausalRef::Turn {
246            session_id,
247            turn_id,
248        } => format!("cause:turn:{session_id}:{turn_id}:"),
249        CausalRef::Effect {
250            session_id,
251            turn_id,
252            effect_id,
253        } => {
254            let turn = turn_id.as_deref().unwrap_or("");
255            format!("cause:effect:{session_id}:{turn}:{effect_id}:")
256        }
257        CausalRef::ToolCall {
258            session_id,
259            call_id,
260        } => format!("cause:tool_call:{session_id}:{call_id}:"),
261        CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
262        CausalRef::ProcessEvent {
263            process_id,
264            sequence,
265        } => format!("cause:process_event:{process_id}:{sequence}:"),
266        CausalRef::SessionNode {
267            session_id,
268            node_id,
269        } => format!("cause:session_node:{session_id}:{node_id}:"),
270    }
271}