ao_core/events.rs
1//! Events broadcast by the `LifecycleManager` to anyone watching the
2//! session fleet — the CLI's `ao-rs watch`, future reaction engines,
3//! future notifier plugins, an eventual SSE API.
4//!
5//! **All variants must be `Clone`** because they ride on
6//! `tokio::sync::broadcast`, which fans a single send out to every
7//! subscriber by cloning.
8//!
9//! We keep the event surface intentionally small for Phase C:
10//! - `Spawned` when a brand-new session is observed for the first time
11//! - `SessionRestored` when a session that already existed on disk is
12//! observed on the loop's first tick — separate from `Spawned` so
13//! `watch` and dashboard consumers don't mislabel pre-existing
14//! sessions as new
15//! - `StatusChanged` when lifecycle transitions a session between
16//! `SessionStatus` variants
17//! - `ActivityChanged` when the polled `ActivityState` changes
18//! - `Terminated` when the runtime is no longer alive — separate from
19//! `StatusChanged` because subscribers often want to react to *dead*
20//! specifically (e.g. start cleanup)
21//! - `TickError` surfaces polling-loop errors without killing the loop
22
23use crate::{
24 reactions::ReactionAction,
25 types::{ActivityState, SessionId, SessionStatus},
26};
27use serde::Serialize;
28
29#[derive(Debug, Clone, Serialize)]
30pub struct UiNotification {
31 pub id: SessionId,
32 pub reaction_key: String,
33 pub action: ReactionAction,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub message: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub priority: Option<String>,
38}
39
40#[derive(Debug, Clone, Serialize)]
41#[serde(tag = "type", rename_all = "snake_case")]
42pub enum OrchestratorEvent {
43 /// A session was created after the lifecycle loop was already running.
44 /// The loop decides "new" by comparing `session.created_at` against its
45 /// own startup timestamp — a session observed on the first tick whose
46 /// `created_at` predates startup is reported via `SessionRestored`
47 /// instead, so `watch` output distinguishes "brand new spawn" from
48 /// "restored from disk".
49 Spawned { id: SessionId, project_id: String },
50
51 /// A session that already existed on disk was observed by the
52 /// lifecycle loop on its first tick after startup. Emitted at most
53 /// once per session and only during the first tick — subsequent
54 /// appearances use `Spawned`. Consumers use this to suppress the
55 /// "N sessions just spawned" flood on reconnect.
56 SessionRestored {
57 id: SessionId,
58 project_id: String,
59 /// On-disk status at the moment of observation. Useful for UI
60 /// filtering (e.g. skip terminal sessions) without an extra
61 /// snapshot round-trip.
62 status: SessionStatus,
63 },
64
65 /// Lifecycle-driven status transition. `from == to` is never emitted.
66 StatusChanged {
67 id: SessionId,
68 from: SessionStatus,
69 to: SessionStatus,
70 },
71
72 /// Polled activity changed. `prev` is `None` on the first successful poll.
73 ActivityChanged {
74 id: SessionId,
75 prev: Option<ActivityState>,
76 next: ActivityState,
77 },
78
79 /// Runtime process is gone. Emitted exactly once per session.
80 Terminated {
81 id: SessionId,
82 reason: TerminationReason,
83 },
84
85 /// Polling-loop error for one session. The loop itself keeps running.
86 TickError { id: SessionId, message: String },
87
88 /// A configured reaction successfully ran its action. The engine emits
89 /// this on every successful dispatch — subscribers use it to surface
90 /// "ao-rs just fired X" in the CLI and for assertions in tests.
91 ///
92 /// `action` is the action the engine *actually* took, which may differ
93 /// from the configured action if the engine escalated mid-flight
94 /// (`SendToAgent` → `Notify`). Pair with `ReactionEscalated` to tell
95 /// first-time successes apart from escalations.
96 ReactionTriggered {
97 id: SessionId,
98 /// Reaction key from config (e.g. `"ci-failed"`).
99 reaction_key: String,
100 /// Action the engine actually executed this attempt.
101 action: ReactionAction,
102 },
103
104 /// The retry budget for a reaction was exhausted and the engine fell
105 /// back to `Notify`. Emitted *in addition to* the `ReactionTriggered`
106 /// that represents the escalated notify — so subscribers that only
107 /// care about "something was escalated" can filter on this event
108 /// alone without having to join on attempts counts.
109 ReactionEscalated {
110 id: SessionId,
111 reaction_key: String,
112 /// How many attempts had been made when escalation was decided.
113 /// The value is the attempt count *that triggered* escalation,
114 /// not `retries + 1`, so a user reading logs sees exactly how
115 /// many times the agent was poked before the notify fell through.
116 attempts: u32,
117 },
118
119 /// UI-friendly notification event (dashboard/desktop toasts).
120 ///
121 /// Emitted in addition to `ReactionTriggered` for reactions that should
122 /// surface to users in real time.
123 UiNotification { notification: UiNotification },
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
127#[serde(rename_all = "snake_case")]
128pub enum TerminationReason {
129 /// `Runtime::is_alive` returned false.
130 RuntimeGone,
131 /// The agent plugin reported `ActivityState::Exited`.
132 AgentExited,
133 /// Session had no runtime_handle to probe (e.g. crashed before create).
134 NoHandle,
135}
136
137impl TerminationReason {
138 pub const fn as_str(self) -> &'static str {
139 match self {
140 Self::RuntimeGone => "runtime_gone",
141 Self::AgentExited => "agent_exited",
142 Self::NoHandle => "no_handle",
143 }
144 }
145}
146
147impl std::fmt::Display for TerminationReason {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str(self.as_str())
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 //! Serde tag checks — the wire form is public (SSE / logs), so a rename
156 //! is a breaking change. These tests pin every variant's `type` tag.
157
158 use super::*;
159 use serde_json::{json, Value};
160
161 fn tag_of(ev: &OrchestratorEvent) -> String {
162 let v: Value = serde_json::to_value(ev).unwrap();
163 v.get("type")
164 .and_then(Value::as_str)
165 .expect("event missing `type` tag")
166 .to_string()
167 }
168
169 fn sid(s: &str) -> SessionId {
170 SessionId(s.into())
171 }
172
173 #[test]
174 fn every_variant_has_expected_tag() {
175 let cases: &[(&str, OrchestratorEvent)] = &[
176 (
177 "spawned",
178 OrchestratorEvent::Spawned {
179 id: sid("s1"),
180 project_id: "demo".into(),
181 },
182 ),
183 (
184 "session_restored",
185 OrchestratorEvent::SessionRestored {
186 id: sid("s1"),
187 project_id: "demo".into(),
188 status: SessionStatus::Spawning,
189 },
190 ),
191 (
192 "status_changed",
193 OrchestratorEvent::StatusChanged {
194 id: sid("s1"),
195 from: SessionStatus::Spawning,
196 to: SessionStatus::Working,
197 },
198 ),
199 (
200 "activity_changed",
201 OrchestratorEvent::ActivityChanged {
202 id: sid("s1"),
203 prev: None,
204 next: ActivityState::Ready,
205 },
206 ),
207 (
208 "terminated",
209 OrchestratorEvent::Terminated {
210 id: sid("s1"),
211 reason: TerminationReason::AgentExited,
212 },
213 ),
214 (
215 "tick_error",
216 OrchestratorEvent::TickError {
217 id: sid("s1"),
218 message: "boom".into(),
219 },
220 ),
221 (
222 "reaction_triggered",
223 OrchestratorEvent::ReactionTriggered {
224 id: sid("s1"),
225 reaction_key: "ci-failed".into(),
226 action: ReactionAction::Notify,
227 },
228 ),
229 (
230 "reaction_escalated",
231 OrchestratorEvent::ReactionEscalated {
232 id: sid("s1"),
233 reaction_key: "ci-failed".into(),
234 attempts: 3,
235 },
236 ),
237 (
238 "ui_notification",
239 OrchestratorEvent::UiNotification {
240 notification: UiNotification {
241 id: sid("s1"),
242 reaction_key: "ci-failed".into(),
243 action: ReactionAction::Notify,
244 message: None,
245 priority: None,
246 },
247 },
248 ),
249 ];
250
251 for (expected, ev) in cases {
252 assert_eq!(&tag_of(ev), expected, "wrong tag for {ev:?}");
253 }
254 }
255
256 #[test]
257 fn session_restored_carries_status_field() {
258 let ev = OrchestratorEvent::SessionRestored {
259 id: sid("s1"),
260 project_id: "demo".into(),
261 status: SessionStatus::Working,
262 };
263 let v: Value = serde_json::to_value(&ev).unwrap();
264 assert_eq!(
265 v,
266 json!({
267 "type": "session_restored",
268 "id": "s1",
269 "project_id": "demo",
270 "status": "working",
271 })
272 );
273 }
274
275 #[test]
276 fn termination_reason_wire_form_is_snake_case() {
277 assert_eq!(
278 serde_json::to_value(TerminationReason::RuntimeGone).unwrap(),
279 json!("runtime_gone")
280 );
281 assert_eq!(
282 serde_json::to_value(TerminationReason::AgentExited).unwrap(),
283 json!("agent_exited")
284 );
285 assert_eq!(
286 serde_json::to_value(TerminationReason::NoHandle).unwrap(),
287 json!("no_handle")
288 );
289 }
290}