Skip to main content

entelix_session/
audit_sink.rs

1//! `SessionAuditSink` — adapter that maps `entelix_core::AuditSink`
2//! `record_*` calls onto fire-and-forget `SessionLog::append` calls.
3//!
4//! The split between `AuditSink` (typed emit channel pinned in
5//! `entelix-core`) and `SessionLog` (persistence shape pinned here)
6//! lets emitters in `entelix-tools`, `entelix-graph`, `entelix-agents`
7//! depend only on `entelix-core` while still landing their events
8//! in the durable Tier-2 log. See
9
10use std::sync::Arc;
11
12use chrono::Utc;
13
14use entelix_core::{AuditSink, ThreadKey};
15
16use crate::event::GraphEvent;
17use crate::log::SessionLog;
18
19/// Adapter that fans `AuditSink::record_*` calls into a durable
20/// [`SessionLog`].
21///
22/// Cloning is cheap (`Arc`-shared backend handle). One adapter per
23/// agent run is the typical pattern — operators construct it next to
24/// the `SessionLog` itself and stash it on every spawned
25/// [`entelix_core::context::ExecutionContext`] via
26/// [`ExecutionContext::with_audit_sink`].
27///
28/// [`ExecutionContext::with_audit_sink`]:
29///     entelix_core::context::ExecutionContext::with_audit_sink
30pub struct SessionAuditSink {
31    log: Arc<dyn SessionLog>,
32    key: ThreadKey,
33}
34
35impl SessionAuditSink {
36    /// Build an adapter pinned to one `(tenant_id, thread_id)` pair.
37    /// Multi-thread runs allocate one adapter per thread; the
38    /// adapter is stateless beyond the `Arc` handle so cloning a
39    /// sink and re-keying via [`Self::with_thread_key`] is also a
40    /// valid pattern.
41    #[must_use]
42    pub const fn new(log: Arc<dyn SessionLog>, key: ThreadKey) -> Self {
43        Self { log, key }
44    }
45
46    /// Re-target an existing adapter at a different `ThreadKey`.
47    /// Useful when a parent run spawns a sub-thread and wants the
48    /// sub-thread's events to land under a distinct audit scope.
49    #[must_use]
50    pub fn with_thread_key(self, key: ThreadKey) -> Self {
51        Self { log: self.log, key }
52    }
53}
54
55impl std::fmt::Debug for SessionAuditSink {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("SessionAuditSink")
58            .field("key", &self.key)
59            .finish_non_exhaustive()
60    }
61}
62
63/// Fire-and-forget append helper. The audit channel must never
64/// block the agent — a dropped event is logged via `tracing::warn!`
65/// and the run continues.
66fn spawn_append(log: Arc<dyn SessionLog>, key: ThreadKey, event: GraphEvent) {
67    tokio::spawn(async move {
68        if let Err(err) = log.append(&key, &[event]).await {
69            tracing::warn!(
70                target: "entelix_session::audit_sink",
71                tenant_id = %key.tenant_id(),
72                thread_id = %key.thread_id(),
73                error = %err,
74                "audit-sink append failed; event dropped"
75            );
76        }
77    });
78}
79
80impl AuditSink for SessionAuditSink {
81    fn record_sub_agent_invoked(&self, agent_id: &str, sub_thread_id: &str) {
82        spawn_append(
83            Arc::clone(&self.log),
84            self.key.clone(),
85            GraphEvent::SubAgentInvoked {
86                agent_id: agent_id.to_owned(),
87                sub_thread_id: sub_thread_id.to_owned(),
88                timestamp: Utc::now(),
89            },
90        );
91    }
92
93    fn record_agent_handoff(&self, from: Option<&str>, to: &str) {
94        spawn_append(
95            Arc::clone(&self.log),
96            self.key.clone(),
97            GraphEvent::AgentHandoff {
98                from: from.map(str::to_owned),
99                to: to.to_owned(),
100                timestamp: Utc::now(),
101            },
102        );
103    }
104
105    fn record_resumed(&self, from_checkpoint: &str) {
106        spawn_append(
107            Arc::clone(&self.log),
108            self.key.clone(),
109            GraphEvent::Resumed {
110                from_checkpoint: from_checkpoint.to_owned(),
111                timestamp: Utc::now(),
112            },
113        );
114    }
115
116    fn record_memory_recall(&self, tier: &str, namespace_key: &str, hits: usize) {
117        spawn_append(
118            Arc::clone(&self.log),
119            self.key.clone(),
120            GraphEvent::MemoryRecall {
121                tier: tier.to_owned(),
122                namespace_key: namespace_key.to_owned(),
123                hits,
124                timestamp: Utc::now(),
125            },
126        );
127    }
128
129    fn record_usage_limit_exceeded(&self, breach: &entelix_core::UsageLimitBreach) {
130        spawn_append(
131            Arc::clone(&self.log),
132            self.key.clone(),
133            GraphEvent::UsageLimitExceeded {
134                breach: breach.clone(),
135                timestamp: Utc::now(),
136            },
137        );
138    }
139
140    fn record_context_compacted(&self, dropped_chars: usize, retained_chars: usize) {
141        spawn_append(
142            Arc::clone(&self.log),
143            self.key.clone(),
144            GraphEvent::ContextCompacted {
145                dropped_chars,
146                retained_chars,
147                timestamp: Utc::now(),
148            },
149        );
150    }
151}