Skip to main content

nexo_driver_loop/
events.rs

1//! `DriverEvent` enum + sink trait. Subjects when wired to NATS:
2//! `agent.driver.{goal,attempt}.{started,completed}`,
3//! `agent.driver.{decision,acceptance,budget.exhausted,escalate}`.
4
5use async_trait::async_trait;
6use nexo_driver_types::CompactTrigger;
7use nexo_driver_types::{
8    AcceptanceVerdict, AttemptResult, AutoDreamOutcomeKind, BudgetAxis, BudgetUsage, Decision,
9    Goal, GoalId,
10};
11use serde::{Deserialize, Serialize};
12
13use crate::error::DriverError;
14use crate::orchestrator::GoalOutcome;
15use crate::replay::ReplayDecision;
16
17/// Why extractMemories skipped a turn.
18#[derive(Clone, Debug, Serialize, Deserialize)]
19#[serde(rename_all = "snake_case")]
20pub enum ExtractSkipReason {
21    Disabled,
22    Throttled,
23    InProgress,
24    CircuitBreakerOpen,
25    MainAgentWrote,
26}
27
28// `AutoDreamOutcomeKind` lives in `nexo-driver-types` (re-exported
29// here as the events.rs natural home). The trait `AutoDreamHook`
30// likewise — see `nexo_driver_types::auto_dream`. Conversion
31// `RunOutcome → AutoDreamOutcomeKind` lives in `nexo-dream` inside the
32// `impl AutoDreamHook for AutoDreamRunner` body.
33
34#[derive(Clone, Debug, Serialize, Deserialize)]
35#[serde(tag = "kind", rename_all = "snake_case")]
36pub enum DriverEvent {
37    GoalStarted {
38        goal: Goal,
39    },
40    GoalCompleted {
41        outcome: GoalOutcome,
42    },
43    AttemptStarted {
44        goal_id: GoalId,
45        turn_index: u32,
46        usage: BudgetUsage,
47    },
48    AttemptCompleted {
49        result: AttemptResult,
50    },
51    Decision {
52        decision: Decision,
53    },
54    Acceptance {
55        goal_id: GoalId,
56        verdict: AcceptanceVerdict,
57    },
58    BudgetExhausted {
59        goal_id: GoalId,
60        axis: BudgetAxis,
61        usage: BudgetUsage,
62    },
63    Escalate {
64        goal_id: GoalId,
65        reason: String,
66    },
67    /// Replay-policy classified a mid-turn error.
68    ReplayDecision {
69        goal_id: GoalId,
70        turn_index: u32,
71        decision: ReplayDecision,
72        error_message: String,
73    },
74    /// Orchestrator scheduled a `/compact` turn.
75    CompactRequested {
76        goal_id: GoalId,
77        turn_index: u32,
78        focus: String,
79        token_pressure: f64,
80        /// Token estimate before compaction.
81        before_tokens: u64,
82        /// Session age in minutes when trigger fired.
83        age_minutes: u64,
84        /// What caused the trigger.
85        trigger: CompactTrigger,
86    },
87    /// Compaction completed. Emitted on the turn after the
88    /// compact turn, once `after_tokens` is known.
89    CompactCompleted {
90        goal_id: GoalId,
91        turn_index: u32,
92        after_tokens: u64,
93    },
94    /// Compact summary persisted to long-term memory.
95    CompactSummaryStored {
96        goal_id: GoalId,
97        turn_index: u32,
98        before_tokens: u64,
99        after_tokens: u64,
100    },
101    /// Memory extraction completed.
102    ExtractMemoriesCompleted {
103        goal_id: GoalId,
104        turn_index: u32,
105        memories_saved: u32,
106        duration_ms: u64,
107    },
108    /// Memory extraction skipped (disabled, throttled, etc.).
109    ExtractMemoriesSkipped {
110        goal_id: GoalId,
111        reason: ExtractSkipReason,
112    },
113    /// autoDream consolidation pass outcome. Emitted per turn when the
114    /// runner is wired. Detailed run state lives in the `dream_runs`
115    /// ledger — this event is the lightweight signal for admin-ui /
116    /// chat hooks.
117    AutoDreamOutcome {
118        goal_id: GoalId,
119        outcome_kind: AutoDreamOutcomeKind,
120    },
121    /// Periodic mid-run progress beacon. Fires every
122    /// `progress_every_turns` after an `AttemptCompleted`, so chat
123    /// hooks (`on: progress`) and admin-ui can show 'still going'
124    /// without waiting for the goal to finish.
125    Progress {
126        goal_id: GoalId,
127        turn_index: u32,
128        usage: BudgetUsage,
129        last_text: Option<String>,
130    },
131}
132
133impl DriverEvent {
134    /// NATS subject for this event kind.
135    pub fn nats_subject(&self) -> &'static str {
136        match self {
137            DriverEvent::GoalStarted { .. } => "agent.driver.goal.started",
138            DriverEvent::GoalCompleted { .. } => "agent.driver.goal.completed",
139            DriverEvent::AttemptStarted { .. } => "agent.driver.attempt.started",
140            DriverEvent::AttemptCompleted { .. } => "agent.driver.attempt.completed",
141            DriverEvent::Decision { .. } => "agent.driver.decision",
142            DriverEvent::Acceptance { .. } => "agent.driver.acceptance",
143            DriverEvent::BudgetExhausted { .. } => "agent.driver.budget.exhausted",
144            DriverEvent::Escalate { .. } => "agent.driver.escalate",
145            DriverEvent::ReplayDecision { .. } => "agent.driver.replay",
146            DriverEvent::CompactRequested { .. } => "agent.driver.compact",
147            DriverEvent::CompactCompleted { .. } => "agent.driver.compact.completed",
148            DriverEvent::CompactSummaryStored { .. } => "agent.driver.compact.summary_stored",
149            DriverEvent::ExtractMemoriesCompleted { .. } => {
150                "agent.driver.extract_memories.completed"
151            }
152            DriverEvent::ExtractMemoriesSkipped { .. } => "agent.driver.extract_memories.skipped",
153            DriverEvent::AutoDreamOutcome { .. } => "agent.driver.auto_dream",
154            DriverEvent::Progress { .. } => "agent.driver.progress",
155        }
156    }
157}
158
159#[async_trait]
160pub trait DriverEventSink: Send + Sync + 'static {
161    async fn publish(&self, event: DriverEvent) -> Result<(), DriverError>;
162}
163
164#[derive(Default)]
165pub struct NoopEventSink;
166
167#[async_trait]
168impl DriverEventSink for NoopEventSink {
169    async fn publish(&self, _event: DriverEvent) -> Result<(), DriverError> {
170        Ok(())
171    }
172}
173
174#[cfg(feature = "nats")]
175pub struct NatsEventSink {
176    client: async_nats::Client,
177}
178
179#[cfg(feature = "nats")]
180impl NatsEventSink {
181    pub fn new(client: async_nats::Client) -> Self {
182        Self { client }
183    }
184}
185
186#[cfg(feature = "nats")]
187#[async_trait]
188impl DriverEventSink for NatsEventSink {
189    async fn publish(&self, event: DriverEvent) -> Result<(), DriverError> {
190        let subject = event.nats_subject().to_string();
191        let payload = serde_json::to_vec(&event)?;
192        self.client
193            .publish(subject, payload.into())
194            .await
195            .map_err(|e| DriverError::Nats(e.to_string()))?;
196        Ok(())
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use nexo_driver_types::AttemptOutcome;
204    use uuid::Uuid;
205
206    #[tokio::test]
207    async fn noop_sink_always_ok() {
208        let s = NoopEventSink;
209        s.publish(DriverEvent::Escalate {
210            goal_id: GoalId(Uuid::nil()),
211            reason: "x".into(),
212        })
213        .await
214        .unwrap();
215    }
216
217    #[test]
218    fn nats_subjects_stable() {
219        let g = GoalId(Uuid::nil());
220        let cases: Vec<(DriverEvent, &str)> = vec![
221            (
222                DriverEvent::Escalate {
223                    goal_id: g,
224                    reason: "x".into(),
225                },
226                "agent.driver.escalate",
227            ),
228            (
229                DriverEvent::BudgetExhausted {
230                    goal_id: g,
231                    axis: BudgetAxis::Turns,
232                    usage: BudgetUsage::default(),
233                },
234                "agent.driver.budget.exhausted",
235            ),
236            (
237                DriverEvent::Progress {
238                    goal_id: g,
239                    turn_index: 5,
240                    usage: BudgetUsage::default(),
241                    last_text: None,
242                },
243                "agent.driver.progress",
244            ),
245        ];
246        for (e, want) in cases {
247            assert_eq!(e.nats_subject(), want);
248        }
249    }
250
251    #[test]
252    fn driver_event_round_trips_json() {
253        let e = DriverEvent::AttemptCompleted {
254            result: AttemptResult {
255                goal_id: GoalId(Uuid::nil()),
256                turn_index: 0,
257                outcome: AttemptOutcome::Done,
258                decisions_recorded: vec![],
259                usage_after: BudgetUsage::default(),
260                acceptance: None,
261                final_text: None,
262                harness_extras: serde_json::Map::new(),
263            },
264        };
265        let s = serde_json::to_string(&e).unwrap();
266        let back: DriverEvent = serde_json::from_str(&s).unwrap();
267        assert_eq!(back.nats_subject(), e.nats_subject());
268    }
269}