1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5 CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6 RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10 session_id: &str,
11 turn_id: &str,
12 turn_index: usize,
13 protocol_iteration: usize,
14 effect_id: EffectId,
15 effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17 RuntimeInvocation::effect(
18 RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19 effect_id.0.to_string(),
20 effect_kind,
21 turn_effect_replay_key(
22 session_id,
23 turn_id,
24 turn_index,
25 protocol_iteration,
26 effect_kind,
27 effect_id,
28 ),
29 )
30}
31
32fn turn_effect_replay_key(
33 session_id: &str,
34 turn_id: &str,
35 turn_index: usize,
36 protocol_iteration: usize,
37 kind: RuntimeEffectKind,
38 effect_id: EffectId,
39) -> String {
40 format!(
41 "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42 kind.as_str(),
43 effect_id.0
44 )
45}
46
47pub(crate) fn child_effect_invocation(
48 parent: &RuntimeInvocation,
49 effect_id: impl Into<String>,
50 kind: RuntimeEffectKind,
51 replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53 let replay_base = parent
54 .replay_key()
55 .or_else(|| parent.effect_id())
56 .unwrap_or("effect");
57 RuntimeInvocation {
58 scope: parent.scope.clone(),
59 subject: RuntimeSubject::Effect {
60 effect_id: effect_id.into(),
61 kind,
62 },
63 caused_by: parent.causal_ref(),
64 replay: Some(RuntimeReplay {
65 key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66 }),
67 }
68}
69
70pub(crate) fn child_tool_effect_invocation(
71 parent: &RuntimeInvocation,
72 parent_effect_id: EffectId,
73 call_id: &str,
74) -> RuntimeInvocation {
75 child_effect_invocation(
76 parent,
77 format!("{}:{call_id}", parent_effect_id.0),
78 RuntimeEffectKind::ToolCall,
79 call_id,
80 )
81}
82
83pub(crate) fn tool_retry_sleep_invocation(
84 parent: &RuntimeInvocation,
85 tool_name: &str,
86 attempt: u32,
87) -> RuntimeInvocation {
88 let parent_effect_id = parent.effect_id().unwrap_or("effect");
89 child_effect_invocation(
90 parent,
91 format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
92 RuntimeEffectKind::Sleep,
93 format!("{tool_name}:attempt:{attempt}:sleep"),
94 )
95}
96
97pub(crate) fn lashlang_sleep_invocation(
98 session_id: &str,
99 parent: Option<&RuntimeInvocation>,
100 scope: &str,
101 sequence: u64,
102) -> RuntimeInvocation {
103 let suffix = format!("lashlang:{scope}:sleep:{sequence}");
104 if let Some(parent) = parent {
105 let parent_effect_id = parent.effect_id().unwrap_or("effect");
106 return child_effect_invocation(
107 parent,
108 format!("{parent_effect_id}:{suffix}"),
109 RuntimeEffectKind::Sleep,
110 suffix,
111 );
112 }
113 RuntimeInvocation::effect(
114 RuntimeScope::new(session_id),
115 suffix.clone(),
116 RuntimeEffectKind::Sleep,
117 suffix,
118 )
119}
120
121pub(crate) fn lashlang_await_event_invocation(
122 session_id: &str,
123 parent: Option<&RuntimeInvocation>,
124 process_id: &str,
125 signal_name: &str,
126 ordinal: u64,
127) -> RuntimeInvocation {
128 let suffix = format!("lashlang:process:{process_id}:signal.{signal_name}:await:{ordinal}");
129 if let Some(parent) = parent {
130 let parent_effect_id = parent.effect_id().unwrap_or("effect");
131 return child_effect_invocation(
132 parent,
133 format!("{parent_effect_id}:{suffix}"),
134 RuntimeEffectKind::AwaitEvent,
135 suffix,
136 );
137 }
138 RuntimeInvocation::effect(
139 RuntimeScope::new(session_id),
140 suffix.clone(),
141 RuntimeEffectKind::AwaitEvent,
142 suffix,
143 )
144}
145
146pub(crate) fn process_effect_invocation(
147 session_id: &str,
148 parent: Option<RuntimeInvocation>,
149 effect_id: &str,
150) -> RuntimeInvocation {
151 if let Some(parent) = parent {
152 let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
153 RuntimeScope {
154 session_id: session_id.to_string(),
155 turn_id: Some(turn_id),
156 turn_index: parent.scope.turn_index,
157 protocol_iteration: parent.scope.protocol_iteration,
158 }
159 } else {
160 RuntimeScope::new(session_id)
161 };
162 let replay_base = parent.replay_key().unwrap_or("process");
163 return RuntimeInvocation {
164 scope,
165 subject: RuntimeSubject::Effect {
166 effect_id: effect_id.to_string(),
167 kind: RuntimeEffectKind::Process,
168 },
169 caused_by: parent.causal_ref(),
170 replay: Some(RuntimeReplay {
171 key: format!("{replay_base}:{effect_id}"),
172 }),
173 };
174 }
175 RuntimeInvocation::effect(
176 RuntimeScope::new(session_id),
177 effect_id.to_string(),
178 RuntimeEffectKind::Process,
179 format!("{session_id}:{effect_id}"),
180 )
181}
182
183pub fn process_event_invocation(
184 process_id: &str,
185 sequence: u64,
186 event_type: &str,
187 replay: Option<RuntimeReplay>,
188) -> RuntimeInvocation {
189 RuntimeInvocation {
190 scope: RuntimeScope::new("runtime"),
191 subject: RuntimeSubject::ProcessEvent {
192 process_id: process_id.to_string(),
193 sequence,
194 event_type: event_type.to_string(),
195 },
196 caused_by: Some(CausalRef::Process {
197 process_id: process_id.to_string(),
198 }),
199 replay,
200 }
201}
202
203pub(crate) fn trigger_occurrence_invocation(
204 session_id: &str,
205 occurrence_id: &str,
206) -> RuntimeInvocation {
207 RuntimeInvocation {
208 scope: RuntimeScope::new(session_id),
209 subject: RuntimeSubject::TriggerOccurrence {
210 occurrence_id: occurrence_id.to_string(),
211 },
212 caused_by: None,
213 replay: Some(RuntimeReplay {
214 key: format!("trigger:{occurrence_id}"),
215 }),
216 }
217}
218
219pub(crate) fn direct_effect_invocation(
220 session_id: &str,
221 usage_source: &str,
222 replay_discriminator: String,
223 turn_id: Option<&str>,
224 caused_by: Option<CausalRef>,
225) -> RuntimeInvocation {
226 let replay_key = match turn_id.filter(|value| !value.is_empty()) {
227 Some(turn_id) => {
228 format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
229 }
230 None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
231 };
232 RuntimeInvocation::effect(
233 RuntimeScope {
234 session_id: session_id.to_string(),
235 turn_id: turn_id.map(str::to_string),
236 turn_index: None,
237 protocol_iteration: None,
238 },
239 replay_discriminator,
240 RuntimeEffectKind::Direct,
241 replay_key,
242 )
243 .with_caused_by(caused_by)
244}
245
246pub(crate) fn direct_request_discriminator<T>(
247 request: &T,
248 explicit_replay: Option<&RuntimeReplay>,
249 caused_by: Option<&CausalRef>,
250) -> Result<String, RuntimeEffectControllerError>
251where
252 T: Serialize,
253{
254 let cause_discriminator = caused_by
255 .map(causal_replay_discriminator)
256 .unwrap_or_default();
257 if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
258 return Ok(format!("{cause_discriminator}request:{}", replay.key));
259 }
260 let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
261 RuntimeEffectControllerError::new(
262 "runtime_effect_discriminator",
263 format!("failed to serialize runtime effect discriminator: {err}"),
264 )
265 })?;
266 Ok(format!("{cause_discriminator}sha256:{digest}"))
267}
268
269fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
270 match caused_by {
271 CausalRef::Turn {
272 session_id,
273 turn_id,
274 } => format!("cause:turn:{session_id}:{turn_id}:"),
275 CausalRef::Effect {
276 session_id,
277 turn_id,
278 effect_id,
279 } => {
280 let turn = turn_id.as_deref().unwrap_or("");
281 format!("cause:effect:{session_id}:{turn}:{effect_id}:")
282 }
283 CausalRef::ToolCall {
284 session_id,
285 call_id,
286 } => format!("cause:tool_call:{session_id}:{call_id}:"),
287 CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
288 CausalRef::ProcessEvent {
289 process_id,
290 sequence,
291 } => format!("cause:process_event:{process_id}:{sequence}:"),
292 CausalRef::TriggerOccurrence { occurrence_id } => format!("cause:trigger:{occurrence_id}:"),
293 CausalRef::SessionNode {
294 session_id,
295 node_id,
296 } => format!("cause:session_node:{session_id}:{node_id}:"),
297 }
298}