Skip to main content

awaken_server_contract/contract/
durable_event_sink.rs

1//! Durable event sink wiring for canonical event capture.
2
3use std::collections::BTreeSet;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use parking_lot::Mutex;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use super::commit_coordinator::CanonicalEventStager;
12use super::event::AgentEvent;
13use super::event_sink::EventSink;
14use super::event_store::{
15    CanonicalEventDraft, CanonicalEventKind, EventScope, EventStoreError, EventVisibility,
16    FidelityClass,
17};
18use super::lifecycle::TerminationReason;
19use super::suspension::{ToolCallOutcome, ToolCallResumeMode};
20use super::tool::ToolStatus;
21
22mod compaction;
23use compaction::CompactionObservation;
24
25/// Runtime event durability mode used by [`DurableEventSink`].
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum RuntimeEventDurability {
29    /// Do not persist runtime events.
30    Disabled,
31    /// Persist committed, domain, and control events; skip streaming observations.
32    Compacted,
33    /// Persist all normalized runtime events, including streaming observations.
34    FullFidelity,
35}
36
37impl RuntimeEventDurability {
38    /// Return whether an event with `fidelity` should be appended.
39    #[must_use]
40    pub const fn should_persist(self, fidelity: FidelityClass) -> bool {
41        match self {
42            Self::Disabled => false,
43            Self::Compacted => !matches!(fidelity, FidelityClass::ObservedRuntimeEvent),
44            Self::FullFidelity => true,
45        }
46    }
47}
48
49/// A normalized canonical event draft with its durability class.
50#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
51pub struct NormalizedCanonicalEvent {
52    pub fidelity: FidelityClass,
53    pub draft: CanonicalEventDraft,
54}
55
56impl NormalizedCanonicalEvent {
57    /// Build a normalized event after validating the draft.
58    pub fn new(
59        fidelity: FidelityClass,
60        draft: CanonicalEventDraft,
61    ) -> Result<Self, EventStoreError> {
62        draft.validate()?;
63        Ok(Self { fidelity, draft })
64    }
65}
66
67/// Converts runtime [`AgentEvent`]s to protocol-neutral canonical event drafts.
68pub trait AgentEventNormalizer: Send + Sync {
69    /// Normalize one runtime event.
70    ///
71    /// Returning `None` means the event is intentionally not represented as a
72    /// canonical fact. Errors indicate a durable capture failure.
73    fn normalize(
74        &self,
75        event: &AgentEvent,
76    ) -> Result<Option<NormalizedCanonicalEvent>, EventStoreError>;
77
78    /// Normalize one runtime event into one or more canonical facts.
79    ///
80    /// The default preserves the original one-event contract. Normalizers may
81    /// override this for coarse companion facts such as permission request
82    /// events derived from the same runtime boundary.
83    fn normalize_many(
84        &self,
85        event: &AgentEvent,
86    ) -> Result<Vec<NormalizedCanonicalEvent>, EventStoreError> {
87        Ok(self.normalize(event)?.into_iter().collect())
88    }
89}
90
91/// Scope and metadata used when normalizing runtime events that do not carry
92/// thread or run identifiers themselves.
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
94pub struct AgentEventNormalizationContext {
95    pub thread_id: String,
96    pub run_id: String,
97    pub origin: String,
98    #[serde(default, skip_serializing_if = "Option::is_none")]
99    pub correlation_id: Option<String>,
100}
101
102impl AgentEventNormalizationContext {
103    /// Create a normalization context for one thread/run pair.
104    pub fn new(
105        thread_id: impl Into<String>,
106        run_id: impl Into<String>,
107        origin: impl Into<String>,
108    ) -> Result<Self, EventStoreError> {
109        let context = Self {
110            thread_id: thread_id.into(),
111            run_id: run_id.into(),
112            origin: origin.into(),
113            correlation_id: None,
114        };
115        context.validate()?;
116        Ok(context)
117    }
118
119    /// Attach a correlation id used for tracing and diagnostics.
120    #[must_use]
121    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
122        self.correlation_id = Some(correlation_id.into());
123        self
124    }
125
126    fn validate(&self) -> Result<(), EventStoreError> {
127        reject_blank("thread_id", &self.thread_id)?;
128        reject_blank("run_id", &self.run_id)?;
129        reject_blank("origin", &self.origin)?;
130        Ok(())
131    }
132}
133
134/// Default normalizer for one runtime stream.
135#[derive(Debug)]
136pub struct ScopedAgentEventNormalizer {
137    context: AgentEventNormalizationContext,
138    started_runs: Mutex<BTreeSet<String>>,
139    terminal_runs: Mutex<BTreeSet<String>>,
140    compaction: Mutex<CompactionObservation>,
141}
142
143impl ScopedAgentEventNormalizer {
144    /// Create a normalizer for one thread/run stream.
145    #[must_use]
146    pub fn new(context: AgentEventNormalizationContext) -> Self {
147        Self {
148            context,
149            started_runs: Mutex::new(BTreeSet::new()),
150            terminal_runs: Mutex::new(BTreeSet::new()),
151            compaction: Mutex::new(CompactionObservation::default()),
152        }
153    }
154
155    /// Create a normalizer for a stream that resumes an already-started run.
156    #[must_use]
157    pub fn new_resumed(context: AgentEventNormalizationContext) -> Self {
158        let run_id = context.run_id.clone();
159        let normalizer = Self::new(context);
160        normalizer.started_runs.lock().insert(run_id);
161        normalizer
162    }
163
164    fn scopes_for(&self, thread_id: &str, run_id: &str) -> Vec<EventScope> {
165        vec![EventScope::thread(thread_id), EventScope::run(run_id)]
166    }
167
168    fn context_scopes(&self) -> Vec<EventScope> {
169        self.scopes_for(&self.context.thread_id, &self.context.run_id)
170    }
171
172    fn build(
173        &self,
174        fidelity: FidelityClass,
175        event_kind: &str,
176        scopes: Vec<EventScope>,
177        payload: Value,
178    ) -> Result<NormalizedCanonicalEvent, EventStoreError> {
179        let mut draft = CanonicalEventDraft::new(
180            scopes,
181            CanonicalEventKind::new(event_kind)?,
182            payload,
183            self.context.origin.clone(),
184        )?;
185        draft.visibility = EventVisibility::Public;
186        draft.correlation_id = self.context.correlation_id.clone();
187        NormalizedCanonicalEvent::new(fidelity, draft)
188    }
189}
190
191impl AgentEventNormalizer for ScopedAgentEventNormalizer {
192    fn normalize_many(
193        &self,
194        event: &AgentEvent,
195    ) -> Result<Vec<NormalizedCanonicalEvent>, EventStoreError> {
196        let mut events = self.normalize(event)?.into_iter().collect::<Vec<_>>();
197        if let Some(permission_requested) = self.tool_permission_requested(event)? {
198            events.push(permission_requested);
199        }
200        events.extend(self.context_compaction_events(event)?);
201        Ok(events)
202    }
203
204    fn normalize(
205        &self,
206        event: &AgentEvent,
207    ) -> Result<Option<NormalizedCanonicalEvent>, EventStoreError> {
208        let (fidelity, kind, scopes) = match event {
209            AgentEvent::RunStart {
210                thread_id, run_id, ..
211            } => {
212                let kind = {
213                    let mut started = self.started_runs.lock();
214                    if started.insert(run_id.clone()) {
215                        "RunStarted"
216                    } else {
217                        "RunResumed"
218                    }
219                };
220                (
221                    FidelityClass::DomainEvent,
222                    kind,
223                    self.scopes_for(thread_id, run_id),
224                )
225            }
226            AgentEvent::RunFinish {
227                thread_id,
228                run_id,
229                termination,
230                ..
231            } => {
232                let already_terminal = {
233                    let mut terminal = self.terminal_runs.lock();
234                    !terminal.insert(run_id.clone())
235                };
236                if already_terminal {
237                    return Ok(None);
238                }
239                (
240                    FidelityClass::DomainEvent,
241                    run_finish_kind(termination),
242                    self.scopes_for(thread_id, run_id),
243                )
244            }
245            AgentEvent::TextDelta { .. } => (
246                FidelityClass::ObservedRuntimeEvent,
247                "TextDeltaObserved",
248                self.context_scopes(),
249            ),
250            AgentEvent::ReasoningDelta { .. } => (
251                FidelityClass::ObservedRuntimeEvent,
252                "ReasoningDeltaObserved",
253                self.context_scopes(),
254            ),
255            AgentEvent::ReasoningEncryptedValue { .. } => (
256                FidelityClass::ObservedRuntimeEvent,
257                "ReasoningEncryptedValueObserved",
258                self.context_scopes(),
259            ),
260            AgentEvent::ToolCallStart { .. } => (
261                FidelityClass::ObservedRuntimeEvent,
262                "ToolCallStarted",
263                self.context_scopes(),
264            ),
265            AgentEvent::ToolCallDelta { .. } => (
266                FidelityClass::ObservedRuntimeEvent,
267                "ToolCallDeltaObserved",
268                self.context_scopes(),
269            ),
270            AgentEvent::ToolCallReady { .. } => (
271                FidelityClass::CommittedRuntimeEvent,
272                "ToolCallReady",
273                self.context_scopes(),
274            ),
275            AgentEvent::ToolCallDone {
276                result, outcome, ..
277            } => {
278                // ADR-0034 D11: ToolCallSuspended/Rejected/TimedOut are
279                // canonical lifecycle DomainEvents even though the source
280                // AgentEvent::ToolCallDone is a CommittedRuntimeEvent.
281                let (fidelity, kind) = if *outcome == ToolCallOutcome::Suspended {
282                    (FidelityClass::DomainEvent, "ToolCallSuspended")
283                } else if *outcome == ToolCallOutcome::Failed
284                    && result.metadata.contains_key("rejected")
285                {
286                    (FidelityClass::DomainEvent, "ToolCallRejected")
287                } else if *outcome == ToolCallOutcome::Failed
288                    && result
289                        .metadata
290                        .get("timed_out")
291                        .and_then(Value::as_bool)
292                        .unwrap_or(false)
293                {
294                    (FidelityClass::DomainEvent, "ToolCallTimedOut")
295                } else {
296                    (FidelityClass::CommittedRuntimeEvent, "ToolCallDone")
297                };
298                (fidelity, kind, self.context_scopes())
299            }
300            AgentEvent::ToolCallStreamDelta { .. } => (
301                FidelityClass::ObservedRuntimeEvent,
302                "ToolCallStreamDeltaObserved",
303                self.context_scopes(),
304            ),
305            AgentEvent::ToolCallResumed { .. } => (
306                FidelityClass::ControlEvent,
307                "ToolCallResumed",
308                self.context_scopes(),
309            ),
310            AgentEvent::ToolCallCancel { .. } => (
311                // ADR-0034 D11: ToolCallCancelled is a canonical lifecycle
312                // DomainEvent.
313                FidelityClass::DomainEvent,
314                "ToolCallCancelled",
315                self.context_scopes(),
316            ),
317            AgentEvent::StreamReset { .. } => (
318                FidelityClass::CommittedRuntimeEvent,
319                "StreamReset",
320                self.context_scopes(),
321            ),
322            AgentEvent::StepStart { .. } => (
323                FidelityClass::ObservedRuntimeEvent,
324                "StepStarted",
325                self.context_scopes(),
326            ),
327            AgentEvent::StepEnd => (
328                FidelityClass::ObservedRuntimeEvent,
329                "StepEnded",
330                self.context_scopes(),
331            ),
332            AgentEvent::InferenceComplete { .. } => (
333                FidelityClass::CommittedRuntimeEvent,
334                "InferenceComplete",
335                self.context_scopes(),
336            ),
337            AgentEvent::MessagesSnapshot { .. } => (
338                FidelityClass::ObservedRuntimeEvent,
339                "MessagesSnapshotObserved",
340                self.context_scopes(),
341            ),
342            AgentEvent::ActivitySnapshot { .. } => (
343                FidelityClass::ObservedRuntimeEvent,
344                "ActivitySnapshotObserved",
345                self.context_scopes(),
346            ),
347            AgentEvent::ActivityDelta { .. } => (
348                FidelityClass::ObservedRuntimeEvent,
349                "ActivityDeltaObserved",
350                self.context_scopes(),
351            ),
352            AgentEvent::StateSnapshot { .. } => (
353                FidelityClass::ObservedRuntimeEvent,
354                "StateSnapshotObserved",
355                self.context_scopes(),
356            ),
357            AgentEvent::StateDelta { .. } => (
358                FidelityClass::ObservedRuntimeEvent,
359                "StateDeltaObserved",
360                self.context_scopes(),
361            ),
362            AgentEvent::Error { .. } => (
363                FidelityClass::CommittedRuntimeEvent,
364                "ErrorRecorded",
365                self.context_scopes(),
366            ),
367        };
368
369        let payload = serde_json::to_value(event)
370            .map_err(|error| EventStoreError::Serialization(error.to_string()))?;
371        self.build(fidelity, kind, scopes, payload).map(Some)
372    }
373}
374
375impl ScopedAgentEventNormalizer {
376    fn tool_permission_requested(
377        &self,
378        event: &AgentEvent,
379    ) -> Result<Option<NormalizedCanonicalEvent>, EventStoreError> {
380        let AgentEvent::ToolCallDone {
381            result, outcome, ..
382        } = event
383        else {
384            return Ok(None);
385        };
386        if *outcome != ToolCallOutcome::Suspended || result.status != ToolStatus::Pending {
387            return Ok(None);
388        }
389        let Some(ticket) = result.suspension.as_ref() else {
390            return Ok(None);
391        };
392        if ticket.resume_mode != ToolCallResumeMode::ReplayToolCall
393            || ticket.suspension.id.trim().is_empty()
394        {
395            return Ok(None);
396        }
397        let payload = serde_json::to_value(event)
398            .map_err(|error| EventStoreError::Serialization(error.to_string()))?;
399        self.build(
400            FidelityClass::DomainEvent,
401            "ToolPermissionRequested",
402            self.context_scopes(),
403            payload,
404        )
405        .map(Some)
406    }
407}
408
409/// EventSink wrapper that forwards live events and stages canonical drafts.
410pub struct DurableEventSink {
411    inner: Arc<dyn EventSink>,
412    stager: Arc<dyn CanonicalEventStager>,
413    normalizer: Arc<dyn AgentEventNormalizer>,
414    mode: RuntimeEventDurability,
415}
416
417impl DurableEventSink {
418    /// Create a durable sink wrapper.
419    #[must_use]
420    pub fn new(
421        inner: Arc<dyn EventSink>,
422        stager: Arc<dyn CanonicalEventStager>,
423        normalizer: Arc<dyn AgentEventNormalizer>,
424        mode: RuntimeEventDurability,
425    ) -> Self {
426        Self {
427            inner,
428            stager,
429            normalizer,
430            mode,
431        }
432    }
433}
434
435#[async_trait]
436impl EventSink for DurableEventSink {
437    async fn emit(&self, event: AgentEvent) {
438        self.inner.emit(event.clone()).await;
439
440        let normalized = match self.normalizer.normalize_many(&event) {
441            Ok(normalized) => normalized,
442            Err(error) => {
443                tracing::error!(
444                    error = %error,
445                    "durable event sink normalizer failed; live event was forwarded without canonical staging"
446                );
447                return;
448            }
449        };
450
451        for normalized in normalized {
452            if !self.mode.should_persist(normalized.fidelity) {
453                continue;
454            }
455            self.stager.stage(normalized.draft);
456        }
457    }
458
459    async fn close(&self) {
460        self.inner.close().await;
461    }
462}
463
464fn run_finish_kind(termination: &TerminationReason) -> &'static str {
465    match termination {
466        TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested => "RunFinished",
467        TerminationReason::Suspended => "RunSuspended",
468        TerminationReason::Cancelled => "RunCancelled",
469        TerminationReason::Error(_) => "RunErrored",
470        TerminationReason::Stopped(_) | TerminationReason::Blocked(_) => "RunTerminated",
471    }
472}
473
474fn reject_blank(field: &str, value: &str) -> Result<(), EventStoreError> {
475    if value.trim().is_empty() {
476        return Err(EventStoreError::Validation(format!("{field} is required")));
477    }
478    Ok(())
479}
480
481#[cfg(test)]
482mod tests;