1use std::collections::BTreeSet;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use parking_lot::Mutex;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use super::commit_coordinator::CanonicalEventStager;
12use super::event::AgentEvent;
13use super::event_sink::EventSink;
14use super::event_store::{
15 CanonicalEventDraft, CanonicalEventKind, EventScope, EventStoreError, EventVisibility,
16 FidelityClass,
17};
18use super::lifecycle::TerminationReason;
19use super::suspension::{ToolCallOutcome, ToolCallResumeMode};
20use super::tool::ToolStatus;
21
22mod compaction;
23use compaction::CompactionObservation;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum RuntimeEventDurability {
29 Disabled,
31 Compacted,
33 FullFidelity,
35}
36
37impl RuntimeEventDurability {
38 #[must_use]
40 pub const fn should_persist(self, fidelity: FidelityClass) -> bool {
41 match self {
42 Self::Disabled => false,
43 Self::Compacted => !matches!(fidelity, FidelityClass::ObservedRuntimeEvent),
44 Self::FullFidelity => true,
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
51pub struct NormalizedCanonicalEvent {
52 pub fidelity: FidelityClass,
53 pub draft: CanonicalEventDraft,
54}
55
56impl NormalizedCanonicalEvent {
57 pub fn new(
59 fidelity: FidelityClass,
60 draft: CanonicalEventDraft,
61 ) -> Result<Self, EventStoreError> {
62 draft.validate()?;
63 Ok(Self { fidelity, draft })
64 }
65}
66
67pub trait AgentEventNormalizer: Send + Sync {
69 fn normalize(
74 &self,
75 event: &AgentEvent,
76 ) -> Result<Option<NormalizedCanonicalEvent>, EventStoreError>;
77
78 fn normalize_many(
84 &self,
85 event: &AgentEvent,
86 ) -> Result<Vec<NormalizedCanonicalEvent>, EventStoreError> {
87 Ok(self.normalize(event)?.into_iter().collect())
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
94pub struct AgentEventNormalizationContext {
95 pub thread_id: String,
96 pub run_id: String,
97 pub origin: String,
98 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub correlation_id: Option<String>,
100}
101
102impl AgentEventNormalizationContext {
103 pub fn new(
105 thread_id: impl Into<String>,
106 run_id: impl Into<String>,
107 origin: impl Into<String>,
108 ) -> Result<Self, EventStoreError> {
109 let context = Self {
110 thread_id: thread_id.into(),
111 run_id: run_id.into(),
112 origin: origin.into(),
113 correlation_id: None,
114 };
115 context.validate()?;
116 Ok(context)
117 }
118
119 #[must_use]
121 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
122 self.correlation_id = Some(correlation_id.into());
123 self
124 }
125
126 fn validate(&self) -> Result<(), EventStoreError> {
127 reject_blank("thread_id", &self.thread_id)?;
128 reject_blank("run_id", &self.run_id)?;
129 reject_blank("origin", &self.origin)?;
130 Ok(())
131 }
132}
133
134#[derive(Debug)]
136pub struct ScopedAgentEventNormalizer {
137 context: AgentEventNormalizationContext,
138 started_runs: Mutex<BTreeSet<String>>,
139 terminal_runs: Mutex<BTreeSet<String>>,
140 compaction: Mutex<CompactionObservation>,
141}
142
143impl ScopedAgentEventNormalizer {
144 #[must_use]
146 pub fn new(context: AgentEventNormalizationContext) -> Self {
147 Self {
148 context,
149 started_runs: Mutex::new(BTreeSet::new()),
150 terminal_runs: Mutex::new(BTreeSet::new()),
151 compaction: Mutex::new(CompactionObservation::default()),
152 }
153 }
154
155 #[must_use]
157 pub fn new_resumed(context: AgentEventNormalizationContext) -> Self {
158 let run_id = context.run_id.clone();
159 let normalizer = Self::new(context);
160 normalizer.started_runs.lock().insert(run_id);
161 normalizer
162 }
163
164 fn scopes_for(&self, thread_id: &str, run_id: &str) -> Vec<EventScope> {
165 vec![EventScope::thread(thread_id), EventScope::run(run_id)]
166 }
167
168 fn context_scopes(&self) -> Vec<EventScope> {
169 self.scopes_for(&self.context.thread_id, &self.context.run_id)
170 }
171
172 fn build(
173 &self,
174 fidelity: FidelityClass,
175 event_kind: &str,
176 scopes: Vec<EventScope>,
177 payload: Value,
178 ) -> Result<NormalizedCanonicalEvent, EventStoreError> {
179 let mut draft = CanonicalEventDraft::new(
180 scopes,
181 CanonicalEventKind::new(event_kind)?,
182 payload,
183 self.context.origin.clone(),
184 )?;
185 draft.visibility = EventVisibility::Public;
186 draft.correlation_id = self.context.correlation_id.clone();
187 NormalizedCanonicalEvent::new(fidelity, draft)
188 }
189}
190
191impl AgentEventNormalizer for ScopedAgentEventNormalizer {
192 fn normalize_many(
193 &self,
194 event: &AgentEvent,
195 ) -> Result<Vec<NormalizedCanonicalEvent>, EventStoreError> {
196 let mut events = self.normalize(event)?.into_iter().collect::<Vec<_>>();
197 if let Some(permission_requested) = self.tool_permission_requested(event)? {
198 events.push(permission_requested);
199 }
200 events.extend(self.context_compaction_events(event)?);
201 Ok(events)
202 }
203
204 fn normalize(
205 &self,
206 event: &AgentEvent,
207 ) -> Result<Option<NormalizedCanonicalEvent>, EventStoreError> {
208 let (fidelity, kind, scopes) = match event {
209 AgentEvent::RunStart {
210 thread_id, run_id, ..
211 } => {
212 let kind = {
213 let mut started = self.started_runs.lock();
214 if started.insert(run_id.clone()) {
215 "RunStarted"
216 } else {
217 "RunResumed"
218 }
219 };
220 (
221 FidelityClass::DomainEvent,
222 kind,
223 self.scopes_for(thread_id, run_id),
224 )
225 }
226 AgentEvent::RunFinish {
227 thread_id,
228 run_id,
229 termination,
230 ..
231 } => {
232 let already_terminal = {
233 let mut terminal = self.terminal_runs.lock();
234 !terminal.insert(run_id.clone())
235 };
236 if already_terminal {
237 return Ok(None);
238 }
239 (
240 FidelityClass::DomainEvent,
241 run_finish_kind(termination),
242 self.scopes_for(thread_id, run_id),
243 )
244 }
245 AgentEvent::TextDelta { .. } => (
246 FidelityClass::ObservedRuntimeEvent,
247 "TextDeltaObserved",
248 self.context_scopes(),
249 ),
250 AgentEvent::ReasoningDelta { .. } => (
251 FidelityClass::ObservedRuntimeEvent,
252 "ReasoningDeltaObserved",
253 self.context_scopes(),
254 ),
255 AgentEvent::ReasoningEncryptedValue { .. } => (
256 FidelityClass::ObservedRuntimeEvent,
257 "ReasoningEncryptedValueObserved",
258 self.context_scopes(),
259 ),
260 AgentEvent::ToolCallStart { .. } => (
261 FidelityClass::ObservedRuntimeEvent,
262 "ToolCallStarted",
263 self.context_scopes(),
264 ),
265 AgentEvent::ToolCallDelta { .. } => (
266 FidelityClass::ObservedRuntimeEvent,
267 "ToolCallDeltaObserved",
268 self.context_scopes(),
269 ),
270 AgentEvent::ToolCallReady { .. } => (
271 FidelityClass::CommittedRuntimeEvent,
272 "ToolCallReady",
273 self.context_scopes(),
274 ),
275 AgentEvent::ToolCallDone {
276 result, outcome, ..
277 } => {
278 let (fidelity, kind) = if *outcome == ToolCallOutcome::Suspended {
282 (FidelityClass::DomainEvent, "ToolCallSuspended")
283 } else if *outcome == ToolCallOutcome::Failed
284 && result.metadata.contains_key("rejected")
285 {
286 (FidelityClass::DomainEvent, "ToolCallRejected")
287 } else if *outcome == ToolCallOutcome::Failed
288 && result
289 .metadata
290 .get("timed_out")
291 .and_then(Value::as_bool)
292 .unwrap_or(false)
293 {
294 (FidelityClass::DomainEvent, "ToolCallTimedOut")
295 } else {
296 (FidelityClass::CommittedRuntimeEvent, "ToolCallDone")
297 };
298 (fidelity, kind, self.context_scopes())
299 }
300 AgentEvent::ToolCallStreamDelta { .. } => (
301 FidelityClass::ObservedRuntimeEvent,
302 "ToolCallStreamDeltaObserved",
303 self.context_scopes(),
304 ),
305 AgentEvent::ToolCallResumed { .. } => (
306 FidelityClass::ControlEvent,
307 "ToolCallResumed",
308 self.context_scopes(),
309 ),
310 AgentEvent::ToolCallCancel { .. } => (
311 FidelityClass::DomainEvent,
314 "ToolCallCancelled",
315 self.context_scopes(),
316 ),
317 AgentEvent::StreamReset { .. } => (
318 FidelityClass::CommittedRuntimeEvent,
319 "StreamReset",
320 self.context_scopes(),
321 ),
322 AgentEvent::StepStart { .. } => (
323 FidelityClass::ObservedRuntimeEvent,
324 "StepStarted",
325 self.context_scopes(),
326 ),
327 AgentEvent::StepEnd => (
328 FidelityClass::ObservedRuntimeEvent,
329 "StepEnded",
330 self.context_scopes(),
331 ),
332 AgentEvent::InferenceComplete { .. } => (
333 FidelityClass::CommittedRuntimeEvent,
334 "InferenceComplete",
335 self.context_scopes(),
336 ),
337 AgentEvent::MessagesSnapshot { .. } => (
338 FidelityClass::ObservedRuntimeEvent,
339 "MessagesSnapshotObserved",
340 self.context_scopes(),
341 ),
342 AgentEvent::ActivitySnapshot { .. } => (
343 FidelityClass::ObservedRuntimeEvent,
344 "ActivitySnapshotObserved",
345 self.context_scopes(),
346 ),
347 AgentEvent::ActivityDelta { .. } => (
348 FidelityClass::ObservedRuntimeEvent,
349 "ActivityDeltaObserved",
350 self.context_scopes(),
351 ),
352 AgentEvent::StateSnapshot { .. } => (
353 FidelityClass::ObservedRuntimeEvent,
354 "StateSnapshotObserved",
355 self.context_scopes(),
356 ),
357 AgentEvent::StateDelta { .. } => (
358 FidelityClass::ObservedRuntimeEvent,
359 "StateDeltaObserved",
360 self.context_scopes(),
361 ),
362 AgentEvent::Error { .. } => (
363 FidelityClass::CommittedRuntimeEvent,
364 "ErrorRecorded",
365 self.context_scopes(),
366 ),
367 };
368
369 let payload = serde_json::to_value(event)
370 .map_err(|error| EventStoreError::Serialization(error.to_string()))?;
371 self.build(fidelity, kind, scopes, payload).map(Some)
372 }
373}
374
375impl ScopedAgentEventNormalizer {
376 fn tool_permission_requested(
377 &self,
378 event: &AgentEvent,
379 ) -> Result<Option<NormalizedCanonicalEvent>, EventStoreError> {
380 let AgentEvent::ToolCallDone {
381 result, outcome, ..
382 } = event
383 else {
384 return Ok(None);
385 };
386 if *outcome != ToolCallOutcome::Suspended || result.status != ToolStatus::Pending {
387 return Ok(None);
388 }
389 let Some(ticket) = result.suspension.as_ref() else {
390 return Ok(None);
391 };
392 if ticket.resume_mode != ToolCallResumeMode::ReplayToolCall
393 || ticket.suspension.id.trim().is_empty()
394 {
395 return Ok(None);
396 }
397 let payload = serde_json::to_value(event)
398 .map_err(|error| EventStoreError::Serialization(error.to_string()))?;
399 self.build(
400 FidelityClass::DomainEvent,
401 "ToolPermissionRequested",
402 self.context_scopes(),
403 payload,
404 )
405 .map(Some)
406 }
407}
408
409pub struct DurableEventSink {
411 inner: Arc<dyn EventSink>,
412 stager: Arc<dyn CanonicalEventStager>,
413 normalizer: Arc<dyn AgentEventNormalizer>,
414 mode: RuntimeEventDurability,
415}
416
417impl DurableEventSink {
418 #[must_use]
420 pub fn new(
421 inner: Arc<dyn EventSink>,
422 stager: Arc<dyn CanonicalEventStager>,
423 normalizer: Arc<dyn AgentEventNormalizer>,
424 mode: RuntimeEventDurability,
425 ) -> Self {
426 Self {
427 inner,
428 stager,
429 normalizer,
430 mode,
431 }
432 }
433}
434
435#[async_trait]
436impl EventSink for DurableEventSink {
437 async fn emit(&self, event: AgentEvent) {
438 self.inner.emit(event.clone()).await;
439
440 let normalized = match self.normalizer.normalize_many(&event) {
441 Ok(normalized) => normalized,
442 Err(error) => {
443 tracing::error!(
444 error = %error,
445 "durable event sink normalizer failed; live event was forwarded without canonical staging"
446 );
447 return;
448 }
449 };
450
451 for normalized in normalized {
452 if !self.mode.should_persist(normalized.fidelity) {
453 continue;
454 }
455 self.stager.stage(normalized.draft);
456 }
457 }
458
459 async fn close(&self) {
460 self.inner.close().await;
461 }
462}
463
464fn run_finish_kind(termination: &TerminationReason) -> &'static str {
465 match termination {
466 TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested => "RunFinished",
467 TerminationReason::Suspended => "RunSuspended",
468 TerminationReason::Cancelled => "RunCancelled",
469 TerminationReason::Error(_) => "RunErrored",
470 TerminationReason::Stopped(_) | TerminationReason::Blocked(_) => "RunTerminated",
471 }
472}
473
474fn reject_blank(field: &str, value: &str) -> Result<(), EventStoreError> {
475 if value.trim().is_empty() {
476 return Err(EventStoreError::Validation(format!("{field} is required")));
477 }
478 Ok(())
479}
480
481#[cfg(test)]
482mod tests;