1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5 CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6 RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10 session_id: &str,
11 turn_id: &str,
12 turn_index: usize,
13 protocol_iteration: usize,
14 effect_id: EffectId,
15 effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17 RuntimeInvocation::effect(
18 RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19 effect_id.0.to_string(),
20 effect_kind,
21 turn_effect_replay_key(
22 session_id,
23 turn_id,
24 turn_index,
25 protocol_iteration,
26 effect_kind,
27 effect_id,
28 ),
29 )
30}
31
32fn turn_effect_replay_key(
33 session_id: &str,
34 turn_id: &str,
35 turn_index: usize,
36 protocol_iteration: usize,
37 kind: RuntimeEffectKind,
38 effect_id: EffectId,
39) -> String {
40 format!(
41 "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42 kind.as_str(),
43 effect_id.0
44 )
45}
46
47pub(crate) fn child_effect_invocation(
48 parent: &RuntimeInvocation,
49 effect_id: impl Into<String>,
50 kind: RuntimeEffectKind,
51 replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53 let replay_base = parent
54 .replay_key()
55 .or_else(|| parent.effect_id())
56 .unwrap_or("effect");
57 RuntimeInvocation {
58 scope: parent.scope.clone(),
59 subject: RuntimeSubject::Effect {
60 effect_id: effect_id.into(),
61 kind,
62 },
63 caused_by: parent.causal_ref(),
64 replay: Some(RuntimeReplay {
65 key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66 }),
67 }
68}
69
70pub(crate) fn child_tool_effect_invocation(
71 parent: &RuntimeInvocation,
72 parent_effect_id: EffectId,
73 call_id: &str,
74) -> RuntimeInvocation {
75 child_effect_invocation(
76 parent,
77 format!("{}:{call_id}", parent_effect_id.0),
78 RuntimeEffectKind::ToolCall,
79 call_id,
80 )
81}
82
83pub(crate) fn tool_retry_sleep_invocation(
84 parent: &RuntimeInvocation,
85 tool_name: &str,
86 attempt: u32,
87) -> RuntimeInvocation {
88 let parent_effect_id = parent.effect_id().unwrap_or("effect");
89 child_effect_invocation(
90 parent,
91 format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
92 RuntimeEffectKind::Sleep,
93 format!("{tool_name}:attempt:{attempt}:sleep"),
94 )
95}
96
97pub(crate) fn lashlang_sleep_invocation(
98 session_id: &str,
99 parent: Option<&RuntimeInvocation>,
100 scope: &str,
101 sequence: u64,
102) -> RuntimeInvocation {
103 let suffix = format!("lashlang:{scope}:sleep:{sequence}");
104 if let Some(parent) = parent {
105 let parent_effect_id = parent.effect_id().unwrap_or("effect");
106 return child_effect_invocation(
107 parent,
108 format!("{parent_effect_id}:{suffix}"),
109 RuntimeEffectKind::Sleep,
110 suffix,
111 );
112 }
113 RuntimeInvocation::effect(
114 RuntimeScope::new(session_id),
115 suffix.clone(),
116 RuntimeEffectKind::Sleep,
117 suffix,
118 )
119}
120
121pub(crate) fn process_effect_invocation(
122 session_id: &str,
123 parent: Option<RuntimeInvocation>,
124 effect_id: &str,
125) -> RuntimeInvocation {
126 if let Some(parent) = parent {
127 let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
128 RuntimeScope {
129 session_id: session_id.to_string(),
130 turn_id: Some(turn_id),
131 turn_index: parent.scope.turn_index,
132 protocol_iteration: parent.scope.protocol_iteration,
133 }
134 } else {
135 RuntimeScope::new(session_id)
136 };
137 let replay_base = parent.replay_key().unwrap_or("process");
138 return RuntimeInvocation {
139 scope,
140 subject: RuntimeSubject::Effect {
141 effect_id: effect_id.to_string(),
142 kind: RuntimeEffectKind::Process,
143 },
144 caused_by: parent.causal_ref(),
145 replay: Some(RuntimeReplay {
146 key: format!("{replay_base}:{effect_id}"),
147 }),
148 };
149 }
150 RuntimeInvocation::effect(
151 RuntimeScope::new(session_id),
152 effect_id.to_string(),
153 RuntimeEffectKind::Process,
154 format!("{session_id}:{effect_id}"),
155 )
156}
157
158pub fn process_event_invocation(
159 owner_session_id: &str,
160 process_id: &str,
161 sequence: u64,
162 event_type: &str,
163 replay: Option<RuntimeReplay>,
164) -> RuntimeInvocation {
165 RuntimeInvocation {
166 scope: RuntimeScope::new(owner_session_id),
167 subject: RuntimeSubject::ProcessEvent {
168 process_id: process_id.to_string(),
169 sequence,
170 event_type: event_type.to_string(),
171 },
172 caused_by: Some(CausalRef::Process {
173 process_id: process_id.to_string(),
174 }),
175 replay,
176 }
177}
178
179pub(crate) fn session_node_invocation(
180 session_id: &str,
181 node_id: impl Into<String>,
182) -> RuntimeInvocation {
183 RuntimeInvocation {
184 scope: RuntimeScope::new(session_id),
185 subject: RuntimeSubject::SessionNode {
186 node_id: node_id.into(),
187 },
188 caused_by: None,
189 replay: None,
190 }
191}
192
193pub(crate) fn direct_effect_invocation(
194 session_id: &str,
195 usage_source: &str,
196 replay_discriminator: String,
197 turn_id: Option<&str>,
198 caused_by: Option<CausalRef>,
199) -> RuntimeInvocation {
200 let replay_key = match turn_id.filter(|value| !value.is_empty()) {
201 Some(turn_id) => {
202 format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
203 }
204 None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
205 };
206 RuntimeInvocation::effect(
207 RuntimeScope {
208 session_id: session_id.to_string(),
209 turn_id: turn_id.map(str::to_string),
210 turn_index: None,
211 protocol_iteration: None,
212 },
213 replay_discriminator,
214 RuntimeEffectKind::Direct,
215 replay_key,
216 )
217 .with_caused_by(caused_by)
218}
219
220pub(crate) fn direct_request_discriminator<T>(
221 request: &T,
222 explicit_replay: Option<&RuntimeReplay>,
223 caused_by: Option<&CausalRef>,
224) -> Result<String, RuntimeEffectControllerError>
225where
226 T: Serialize,
227{
228 let cause_discriminator = caused_by
229 .map(causal_replay_discriminator)
230 .unwrap_or_default();
231 if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
232 return Ok(format!("{cause_discriminator}request:{}", replay.key));
233 }
234 let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
235 RuntimeEffectControllerError::new(
236 "runtime_effect_discriminator",
237 format!("failed to serialize runtime effect discriminator: {err}"),
238 )
239 })?;
240 Ok(format!("{cause_discriminator}sha256:{digest}"))
241}
242
243fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
244 match caused_by {
245 CausalRef::Turn {
246 session_id,
247 turn_id,
248 } => format!("cause:turn:{session_id}:{turn_id}:"),
249 CausalRef::Effect {
250 session_id,
251 turn_id,
252 effect_id,
253 } => {
254 let turn = turn_id.as_deref().unwrap_or("");
255 format!("cause:effect:{session_id}:{turn}:{effect_id}:")
256 }
257 CausalRef::ToolCall {
258 session_id,
259 call_id,
260 } => format!("cause:tool_call:{session_id}:{call_id}:"),
261 CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
262 CausalRef::ProcessEvent {
263 process_id,
264 sequence,
265 } => format!("cause:process_event:{process_id}:{sequence}:"),
266 CausalRef::SessionNode {
267 session_id,
268 node_id,
269 } => format!("cause:session_node:{session_id}:{node_id}:"),
270 }
271}