1use serde::Serialize;
2
3use crate::sansio::EffectId;
4use crate::{
5 CausalRef, RuntimeEffectControllerError, RuntimeEffectKind, RuntimeInvocation, RuntimeReplay,
6 RuntimeScope, RuntimeSubject,
7};
8
9pub(crate) fn turn_effect_invocation(
10 session_id: &str,
11 turn_id: &str,
12 turn_index: usize,
13 protocol_iteration: usize,
14 effect_id: EffectId,
15 effect_kind: RuntimeEffectKind,
16) -> RuntimeInvocation {
17 RuntimeInvocation::effect(
18 RuntimeScope::for_turn(session_id, turn_id, turn_index, protocol_iteration),
19 effect_id.0.to_string(),
20 effect_kind,
21 turn_effect_replay_key(
22 session_id,
23 turn_id,
24 turn_index,
25 protocol_iteration,
26 effect_kind,
27 effect_id,
28 ),
29 )
30}
31
32fn turn_effect_replay_key(
33 session_id: &str,
34 turn_id: &str,
35 turn_index: usize,
36 protocol_iteration: usize,
37 kind: RuntimeEffectKind,
38 effect_id: EffectId,
39) -> String {
40 format!(
41 "{session_id}:{turn_id}:{turn_index}:{protocol_iteration}:{}:{}",
42 kind.as_str(),
43 effect_id.0
44 )
45}
46
47pub(crate) fn child_effect_invocation(
48 parent: &RuntimeInvocation,
49 effect_id: impl Into<String>,
50 kind: RuntimeEffectKind,
51 replay_suffix: impl AsRef<str>,
52) -> RuntimeInvocation {
53 let replay_base = parent
54 .replay_key()
55 .or_else(|| parent.effect_id())
56 .unwrap_or("effect");
57 RuntimeInvocation {
58 scope: parent.scope.clone(),
59 subject: RuntimeSubject::Effect {
60 effect_id: effect_id.into(),
61 kind,
62 },
63 caused_by: parent.causal_ref(),
64 replay: Some(RuntimeReplay {
65 key: format!("{replay_base}:{}", replay_suffix.as_ref()),
66 }),
67 }
68}
69
70pub(crate) fn tool_retry_sleep_invocation(
71 parent: &RuntimeInvocation,
72 tool_name: &str,
73 attempt: u32,
74) -> RuntimeInvocation {
75 let parent_effect_id = parent.effect_id().unwrap_or("effect");
76 child_effect_invocation(
77 parent,
78 format!("{parent_effect_id}:{tool_name}:attempt:{attempt}:sleep"),
79 RuntimeEffectKind::Sleep,
80 format!("{tool_name}:attempt:{attempt}:sleep"),
81 )
82}
83
84pub(crate) fn process_sleep_invocation(
85 session_id: &str,
86 parent: Option<&RuntimeInvocation>,
87 scope: &str,
88 sequence: u64,
89) -> RuntimeInvocation {
90 let suffix = format!("process:{scope}:sleep:{sequence}");
91 if let Some(parent) = parent {
92 let parent_effect_id = parent.effect_id().unwrap_or("effect");
93 return child_effect_invocation(
94 parent,
95 format!("{parent_effect_id}:{suffix}"),
96 RuntimeEffectKind::Sleep,
97 suffix,
98 );
99 }
100 RuntimeInvocation::effect(
101 RuntimeScope::new(session_id),
102 suffix.clone(),
103 RuntimeEffectKind::Sleep,
104 suffix,
105 )
106}
107
108pub(crate) fn process_await_event_invocation(
109 session_id: &str,
110 parent: Option<&RuntimeInvocation>,
111 process_id: &str,
112 signal_name: &str,
113 ordinal: u64,
114) -> RuntimeInvocation {
115 let suffix = format!("process:{process_id}:signal.{signal_name}:await:{ordinal}");
116 if let Some(parent) = parent {
117 let parent_effect_id = parent.effect_id().unwrap_or("effect");
118 return child_effect_invocation(
119 parent,
120 format!("{parent_effect_id}:{suffix}"),
121 RuntimeEffectKind::AwaitEvent,
122 suffix,
123 );
124 }
125 RuntimeInvocation::effect(
126 RuntimeScope::new(session_id),
127 suffix.clone(),
128 RuntimeEffectKind::AwaitEvent,
129 suffix,
130 )
131}
132
133pub(crate) fn process_effect_invocation(
134 session_id: &str,
135 parent: Option<RuntimeInvocation>,
136 effect_id: &str,
137) -> RuntimeInvocation {
138 if let Some(parent) = parent {
139 let scope = if let Some(turn_id) = parent.scope.turn_id.clone() {
140 RuntimeScope {
141 session_id: session_id.to_string(),
142 turn_id: Some(turn_id),
143 turn_index: parent.scope.turn_index,
144 protocol_iteration: parent.scope.protocol_iteration,
145 }
146 } else {
147 RuntimeScope::new(session_id)
148 };
149 let replay_base = parent.replay_key().unwrap_or("process");
150 return RuntimeInvocation {
151 scope,
152 subject: RuntimeSubject::Effect {
153 effect_id: effect_id.to_string(),
154 kind: RuntimeEffectKind::Process,
155 },
156 caused_by: parent.causal_ref(),
157 replay: Some(RuntimeReplay {
158 key: format!("{replay_base}:{effect_id}"),
159 }),
160 };
161 }
162 RuntimeInvocation::effect(
163 RuntimeScope::new(session_id),
164 effect_id.to_string(),
165 RuntimeEffectKind::Process,
166 format!("{session_id}:{effect_id}"),
167 )
168}
169
170pub fn process_event_invocation(
171 process_id: &str,
172 sequence: u64,
173 event_type: &str,
174 replay: Option<RuntimeReplay>,
175) -> RuntimeInvocation {
176 RuntimeInvocation {
177 scope: RuntimeScope::new("runtime"),
178 subject: RuntimeSubject::ProcessEvent {
179 process_id: process_id.to_string(),
180 sequence,
181 event_type: event_type.to_string(),
182 },
183 caused_by: Some(CausalRef::Process {
184 process_id: process_id.to_string(),
185 }),
186 replay,
187 }
188}
189
190pub(crate) fn trigger_occurrence_invocation(
191 session_id: &str,
192 occurrence_id: &str,
193) -> RuntimeInvocation {
194 RuntimeInvocation {
195 scope: RuntimeScope::new(session_id),
196 subject: RuntimeSubject::TriggerOccurrence {
197 occurrence_id: occurrence_id.to_string(),
198 },
199 caused_by: None,
200 replay: Some(RuntimeReplay {
201 key: format!("trigger:{occurrence_id}"),
202 }),
203 }
204}
205
206pub(crate) fn direct_effect_invocation(
207 session_id: &str,
208 usage_source: &str,
209 replay_discriminator: String,
210 turn_id: Option<&str>,
211 caused_by: Option<CausalRef>,
212) -> RuntimeInvocation {
213 let replay_key = match turn_id.filter(|value| !value.is_empty()) {
214 Some(turn_id) => {
215 format!("{session_id}:{turn_id}:direct:{usage_source}:{replay_discriminator}")
216 }
217 None => format!("{session_id}:direct:{usage_source}:{replay_discriminator}"),
218 };
219 RuntimeInvocation::effect(
220 RuntimeScope {
221 session_id: session_id.to_string(),
222 turn_id: turn_id.map(str::to_string),
223 turn_index: None,
224 protocol_iteration: None,
225 },
226 replay_discriminator,
227 RuntimeEffectKind::Direct,
228 replay_key,
229 )
230 .with_caused_by(caused_by)
231}
232
233pub(crate) fn direct_request_discriminator<T>(
234 request: &T,
235 explicit_replay: Option<&RuntimeReplay>,
236 caused_by: Option<&CausalRef>,
237) -> Result<String, RuntimeEffectControllerError>
238where
239 T: Serialize,
240{
241 let cause_discriminator = caused_by
242 .map(causal_replay_discriminator)
243 .unwrap_or_default();
244 if let Some(replay) = explicit_replay.filter(|replay| !replay.key.is_empty()) {
245 return Ok(format!("{cause_discriminator}request:{}", replay.key));
246 }
247 let digest = crate::stable_hash::stable_json_sha256_hex(request).map_err(|err| {
248 RuntimeEffectControllerError::new(
249 "runtime_effect_discriminator",
250 format!("failed to serialize runtime effect discriminator: {err}"),
251 )
252 })?;
253 Ok(format!("{cause_discriminator}sha256:{digest}"))
254}
255
256fn causal_replay_discriminator(caused_by: &CausalRef) -> String {
257 match caused_by {
258 CausalRef::Turn {
259 session_id,
260 turn_id,
261 } => format!("cause:turn:{session_id}:{turn_id}:"),
262 CausalRef::Effect {
263 session_id,
264 turn_id,
265 effect_id,
266 } => {
267 let turn = turn_id.as_deref().unwrap_or("");
268 format!("cause:effect:{session_id}:{turn}:{effect_id}:")
269 }
270 CausalRef::ToolCall {
271 session_id,
272 call_id,
273 } => format!("cause:tool_call:{session_id}:{call_id}:"),
274 CausalRef::Process { process_id } => format!("cause:process:{process_id}:"),
275 CausalRef::ProcessEvent {
276 process_id,
277 sequence,
278 } => format!("cause:process_event:{process_id}:{sequence}:"),
279 CausalRef::TriggerOccurrence { occurrence_id } => format!("cause:trigger:{occurrence_id}:"),
280 CausalRef::SessionNode {
281 session_id,
282 node_id,
283 } => format!("cause:session_node:{session_id}:{node_id}:"),
284 }
285}