1use async_trait::async_trait;
6use nexo_driver_types::CompactTrigger;
7use nexo_driver_types::{
8 AcceptanceVerdict, AttemptResult, AutoDreamOutcomeKind, BudgetAxis, BudgetUsage, Decision,
9 Goal, GoalId,
10};
11use serde::{Deserialize, Serialize};
12
13use crate::error::DriverError;
14use crate::orchestrator::GoalOutcome;
15use crate::replay::ReplayDecision;
16
17#[derive(Clone, Debug, Serialize, Deserialize)]
19#[serde(rename_all = "snake_case")]
20pub enum ExtractSkipReason {
21 Disabled,
22 Throttled,
23 InProgress,
24 CircuitBreakerOpen,
25 MainAgentWrote,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
35#[serde(tag = "kind", rename_all = "snake_case")]
36pub enum DriverEvent {
37 GoalStarted {
38 goal: Goal,
39 },
40 GoalCompleted {
41 outcome: GoalOutcome,
42 },
43 AttemptStarted {
44 goal_id: GoalId,
45 turn_index: u32,
46 usage: BudgetUsage,
47 },
48 AttemptCompleted {
49 result: AttemptResult,
50 },
51 Decision {
52 decision: Decision,
53 },
54 Acceptance {
55 goal_id: GoalId,
56 verdict: AcceptanceVerdict,
57 },
58 BudgetExhausted {
59 goal_id: GoalId,
60 axis: BudgetAxis,
61 usage: BudgetUsage,
62 },
63 Escalate {
64 goal_id: GoalId,
65 reason: String,
66 },
67 ReplayDecision {
69 goal_id: GoalId,
70 turn_index: u32,
71 decision: ReplayDecision,
72 error_message: String,
73 },
74 CompactRequested {
76 goal_id: GoalId,
77 turn_index: u32,
78 focus: String,
79 token_pressure: f64,
80 before_tokens: u64,
82 age_minutes: u64,
84 trigger: CompactTrigger,
86 },
87 CompactCompleted {
90 goal_id: GoalId,
91 turn_index: u32,
92 after_tokens: u64,
93 },
94 CompactSummaryStored {
96 goal_id: GoalId,
97 turn_index: u32,
98 before_tokens: u64,
99 after_tokens: u64,
100 },
101 ExtractMemoriesCompleted {
103 goal_id: GoalId,
104 turn_index: u32,
105 memories_saved: u32,
106 duration_ms: u64,
107 },
108 ExtractMemoriesSkipped {
110 goal_id: GoalId,
111 reason: ExtractSkipReason,
112 },
113 AutoDreamOutcome {
118 goal_id: GoalId,
119 outcome_kind: AutoDreamOutcomeKind,
120 },
121 Progress {
126 goal_id: GoalId,
127 turn_index: u32,
128 usage: BudgetUsage,
129 last_text: Option<String>,
130 },
131}
132
133impl DriverEvent {
134 pub fn nats_subject(&self) -> &'static str {
136 match self {
137 DriverEvent::GoalStarted { .. } => "agent.driver.goal.started",
138 DriverEvent::GoalCompleted { .. } => "agent.driver.goal.completed",
139 DriverEvent::AttemptStarted { .. } => "agent.driver.attempt.started",
140 DriverEvent::AttemptCompleted { .. } => "agent.driver.attempt.completed",
141 DriverEvent::Decision { .. } => "agent.driver.decision",
142 DriverEvent::Acceptance { .. } => "agent.driver.acceptance",
143 DriverEvent::BudgetExhausted { .. } => "agent.driver.budget.exhausted",
144 DriverEvent::Escalate { .. } => "agent.driver.escalate",
145 DriverEvent::ReplayDecision { .. } => "agent.driver.replay",
146 DriverEvent::CompactRequested { .. } => "agent.driver.compact",
147 DriverEvent::CompactCompleted { .. } => "agent.driver.compact.completed",
148 DriverEvent::CompactSummaryStored { .. } => "agent.driver.compact.summary_stored",
149 DriverEvent::ExtractMemoriesCompleted { .. } => {
150 "agent.driver.extract_memories.completed"
151 }
152 DriverEvent::ExtractMemoriesSkipped { .. } => "agent.driver.extract_memories.skipped",
153 DriverEvent::AutoDreamOutcome { .. } => "agent.driver.auto_dream",
154 DriverEvent::Progress { .. } => "agent.driver.progress",
155 }
156 }
157}
158
159#[async_trait]
160pub trait DriverEventSink: Send + Sync + 'static {
161 async fn publish(&self, event: DriverEvent) -> Result<(), DriverError>;
162}
163
164#[derive(Default)]
165pub struct NoopEventSink;
166
167#[async_trait]
168impl DriverEventSink for NoopEventSink {
169 async fn publish(&self, _event: DriverEvent) -> Result<(), DriverError> {
170 Ok(())
171 }
172}
173
174#[cfg(feature = "nats")]
175pub struct NatsEventSink {
176 client: async_nats::Client,
177}
178
179#[cfg(feature = "nats")]
180impl NatsEventSink {
181 pub fn new(client: async_nats::Client) -> Self {
182 Self { client }
183 }
184}
185
186#[cfg(feature = "nats")]
187#[async_trait]
188impl DriverEventSink for NatsEventSink {
189 async fn publish(&self, event: DriverEvent) -> Result<(), DriverError> {
190 let subject = event.nats_subject().to_string();
191 let payload = serde_json::to_vec(&event)?;
192 self.client
193 .publish(subject, payload.into())
194 .await
195 .map_err(|e| DriverError::Nats(e.to_string()))?;
196 Ok(())
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use nexo_driver_types::AttemptOutcome;
204 use uuid::Uuid;
205
206 #[tokio::test]
207 async fn noop_sink_always_ok() {
208 let s = NoopEventSink;
209 s.publish(DriverEvent::Escalate {
210 goal_id: GoalId(Uuid::nil()),
211 reason: "x".into(),
212 })
213 .await
214 .unwrap();
215 }
216
217 #[test]
218 fn nats_subjects_stable() {
219 let g = GoalId(Uuid::nil());
220 let cases: Vec<(DriverEvent, &str)> = vec![
221 (
222 DriverEvent::Escalate {
223 goal_id: g,
224 reason: "x".into(),
225 },
226 "agent.driver.escalate",
227 ),
228 (
229 DriverEvent::BudgetExhausted {
230 goal_id: g,
231 axis: BudgetAxis::Turns,
232 usage: BudgetUsage::default(),
233 },
234 "agent.driver.budget.exhausted",
235 ),
236 (
237 DriverEvent::Progress {
238 goal_id: g,
239 turn_index: 5,
240 usage: BudgetUsage::default(),
241 last_text: None,
242 },
243 "agent.driver.progress",
244 ),
245 ];
246 for (e, want) in cases {
247 assert_eq!(e.nats_subject(), want);
248 }
249 }
250
251 #[test]
252 fn driver_event_round_trips_json() {
253 let e = DriverEvent::AttemptCompleted {
254 result: AttemptResult {
255 goal_id: GoalId(Uuid::nil()),
256 turn_index: 0,
257 outcome: AttemptOutcome::Done,
258 decisions_recorded: vec![],
259 usage_after: BudgetUsage::default(),
260 acceptance: None,
261 final_text: None,
262 harness_extras: serde_json::Map::new(),
263 },
264 };
265 let s = serde_json::to_string(&e).unwrap();
266 let back: DriverEvent = serde_json::from_str(&s).unwrap();
267 assert_eq!(back.nats_subject(), e.nats_subject());
268 }
269}