1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5 CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6 RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10 session_id: &str,
11 turn_id: &str,
12 turn_index: usize,
13 protocol_iteration: usize,
14 effect_id: EffectId,
15 effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17 RuntimeInvocation::effect(
18 RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19 effect_id.0.to_string(),
20 effect_kind,
21 turn_effect_replay_key(
22 session_id,
23 turn_id,
24 turn_index,
25 protocol_iteration,
26 effect_kind,
27 effect_id,
28 ),
29 )
30}
31
32fn turn_effect_replay_key(
33 session_id: &str,
34 turn_id: &str,
35 turn_index: usize,
36 protocol_iteration: usize,
37 kind: RuntimeEffectKind,
38 effect_id: EffectId,
39) -> String {
40 format!(
41 "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42 kind.as_str(),
43 effect_id.0
44 )
45}
46
47pub(crate) fn child_effect_invocation(
48 parent: &RuntimeInvocation,
49 effect_id: impl Into<String>,
50 kind: RuntimeEffectKind,
51 replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53 let replay_base = parent
54 .replay_key()
55 .or_else(|| parent.effect_id())
56 .unwrap_or("effect");
57 RuntimeInvocation {
58 scope: parent.scope.clone(),
59 subject: RuntimeSubject::Effect {
60 effect_id: effect_id.into(),
61 kind,
62 },
63 caused_by: parent.causal_ref(),
64 replay: Some(RuntimeReplay {
65 key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66 }),
67 }
68}
69
70pub(crate) fn child_tool_effect_invocation(
71 parent: &RuntimeInvocation,
72 parent_effect_id: EffectId,
73 call_id: &str,
74) -> RuntimeInvocation {
75 child_effect_invocation(
76 parent,
77 format!("{}:{call_id}", parent_effect_id.0),
78 RuntimeEffectKind::ToolCall,
79 call_id,
80 )
81}
82
83pub(crate) fn tool_retry_sleep_invocation(
84 parent: &RuntimeInvocation,
85 tool_name: &str,
86 attempt: u32,
87) -> RuntimeInvocation {
88 let parent_effect_id = parent.effect_id().unwrap_or("effect");
89 child_effect_invocation(
90 parent,
91 format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
92 RuntimeEffectKind::Sleep,
93 format!("{tool_name}:attempt:{attempt}:sleep"),
94 )
95}
96
97pub(crate) fn lashlang_sleep_invocation(
98 session_id: &str,
99 parent: Option<&RuntimeInvocation>,
100 scope: &str,
101 sequence: u64,
102) -> RuntimeInvocation {
103 let suffix = format!("lashlang:{scope}:sleep:{sequence}");
104 if let Some(parent) = parent {
105 let parent_effect_id = parent.effect_id().unwrap_or("effect");
106 return child_effect_invocation(
107 parent,
108 format!("{parent_effect_id}:{suffix}"),
109 RuntimeEffectKind::Sleep,
110 suffix,
111 );
112 }
113 RuntimeInvocation::effect(
114 RuntimeScope::new(session_id),
115 suffix.clone(),
116 RuntimeEffectKind::Sleep,
117 suffix,
118 )
119}
120
121pub(crate) fn lashlang_await_event_invocation(
122 session_id: &str,
123 parent: Option<&RuntimeInvocation>,
124 process_id: &str,
125 signal_name: &str,
126 ordinal: u64,
127) -> RuntimeInvocation {
128 let suffix = format!("lashlang:process:{process_id}:signal.{signal_name}:await:{ordinal}");
129 if let Some(parent) = parent {
130 let parent_effect_id = parent.effect_id().unwrap_or("effect");
131 return child_effect_invocation(
132 parent,
133 format!("{parent_effect_id}:{suffix}"),
134 RuntimeEffectKind::AwaitEvent,
135 suffix,
136 );
137 }
138 RuntimeInvocation::effect(
139 RuntimeScope::new(session_id),
140 suffix.clone(),
141 RuntimeEffectKind::AwaitEvent,
142 suffix,
143 )
144}
145
146pub(crate) fn process_effect_invocation(
147 session_id: &str,
148 parent: Option<RuntimeInvocation>,
149 effect_id: &str,
150) -> RuntimeInvocation {
151 if let Some(parent) = parent {
152 let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
153 RuntimeScope {
154 session_id: session_id.to_string(),
155 turn_id: Some(turn_id),
156 turn_index: parent.scope.turn_index,
157 protocol_iteration: parent.scope.protocol_iteration,
158 }
159 } else {
160 RuntimeScope::new(session_id)
161 };
162 let replay_base = parent.replay_key().unwrap_or("process");
163 return RuntimeInvocation {
164 scope,
165 subject: RuntimeSubject::Effect {
166 effect_id: effect_id.to_string(),
167 kind: RuntimeEffectKind::Process,
168 },
169 caused_by: parent.causal_ref(),
170 replay: Some(RuntimeReplay {
171 key: format!("{replay_base}:{effect_id}"),
172 }),
173 };
174 }
175 RuntimeInvocation::effect(
176 RuntimeScope::new(session_id),
177 effect_id.to_string(),
178 RuntimeEffectKind::Process,
179 format!("{session_id}:{effect_id}"),
180 )
181}
182
183pub fn process_event_invocation(
184 process_id: &str,
185 sequence: u64,
186 event_type: &str,
187 replay: Option<RuntimeReplay>,
188) -> RuntimeInvocation {
189 RuntimeInvocation {
190 scope: RuntimeScope::new("runtime"),
191 subject: RuntimeSubject::ProcessEvent {
192 process_id: process_id.to_string(),
193 sequence,
194 event_type: event_type.to_string(),
195 },
196 caused_by: Some(CausalRef::Process {
197 process_id: process_id.to_string(),
198 }),
199 replay,
200 }
201}
202
203pub(crate) fn host_event_invocation(session_id: &str, occurrence_id: &str) -> RuntimeInvocation {
204 RuntimeInvocation {
205 scope: RuntimeScope::new(session_id),
206 subject: RuntimeSubject::HostEvent {
207 occurrence_id: occurrence_id.to_string(),
208 },
209 caused_by: None,
210 replay: Some(RuntimeReplay {
211 key: format!("host_event:{occurrence_id}"),
212 }),
213 }
214}
215
216pub(crate) fn direct_effect_invocation(
217 session_id: &str,
218 usage_source: &str,
219 replay_discriminator: String,
220 turn_id: Option<&str>,
221 caused_by: Option<CausalRef>,
222) -> RuntimeInvocation {
223 let replay_key = match turn_id.filter(|value| !value.is_empty()) {
224 Some(turn_id) => {
225 format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
226 }
227 None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
228 };
229 RuntimeInvocation::effect(
230 RuntimeScope {
231 session_id: session_id.to_string(),
232 turn_id: turn_id.map(str::to_string),
233 turn_index: None,
234 protocol_iteration: None,
235 },
236 replay_discriminator,
237 RuntimeEffectKind::Direct,
238 replay_key,
239 )
240 .with_caused_by(caused_by)
241}
242
243pub(crate) fn direct_request_discriminator<T>(
244 request: &T,
245 explicit_replay: Option<&RuntimeReplay>,
246 caused_by: Option<&CausalRef>,
247) -> Result<String, RuntimeEffectControllerError>
248where
249 T: Serialize,
250{
251 let cause_discriminator = caused_by
252 .map(causal_replay_discriminator)
253 .unwrap_or_default();
254 if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
255 return Ok(format!("{cause_discriminator}request:{}", replay.key));
256 }
257 let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
258 RuntimeEffectControllerError::new(
259 "runtime_effect_discriminator",
260 format!("failed to serialize runtime effect discriminator: {err}"),
261 )
262 })?;
263 Ok(format!("{cause_discriminator}sha256:{digest}"))
264}
265
266fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
267 match caused_by {
268 CausalRef::Turn {
269 session_id,
270 turn_id,
271 } => format!("cause:turn:{session_id}:{turn_id}:"),
272 CausalRef::Effect {
273 session_id,
274 turn_id,
275 effect_id,
276 } => {
277 let turn = turn_id.as_deref().unwrap_or("");
278 format!("cause:effect:{session_id}:{turn}:{effect_id}:")
279 }
280 CausalRef::ToolCall {
281 session_id,
282 call_id,
283 } => format!("cause:tool_call:{session_id}:{call_id}:"),
284 CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
285 CausalRef::ProcessEvent {
286 process_id,
287 sequence,
288 } => format!("cause:process_event:{process_id}:{sequence}:"),
289 CausalRef::HostEvent { occurrence_id } => format!("cause:host_event:{occurrence_id}:"),
290 CausalRef::SessionNode {
291 session_id,
292 node_id,
293 } => format!("cause:session_node:{session_id}:{node_id}:"),
294 }
295}