Skip to main content

ao_core/
events.rs

1//! Events broadcast by the `LifecycleManager` to anyone watching the
2//! session fleet — the CLI's `ao-rs watch`, future reaction engines,
3//! future notifier plugins, an eventual SSE API.
4//!
5//! **All variants must be `Clone`** because they ride on
6//! `tokio::sync::broadcast`, which fans a single send out to every
7//! subscriber by cloning.
8//!
9//! We keep the event surface intentionally small for Phase C:
10//! - `Spawned` when a brand-new session is observed for the first time
11//! - `SessionRestored` when a session that already existed on disk is
12//!   observed on the loop's first tick — separate from `Spawned` so
13//!   `watch` and dashboard consumers don't mislabel pre-existing
14//!   sessions as new
15//! - `StatusChanged` when lifecycle transitions a session between
16//!   `SessionStatus` variants
17//! - `ActivityChanged` when the polled `ActivityState` changes
18//! - `Terminated` when the runtime is no longer alive — separate from
19//!   `StatusChanged` because subscribers often want to react to *dead*
20//!   specifically (e.g. start cleanup)
21//! - `TickError` surfaces polling-loop errors without killing the loop
22
23use crate::{
24    reactions::ReactionAction,
25    types::{ActivityState, SessionId, SessionStatus},
26};
27use serde::Serialize;
28
29#[derive(Debug, Clone, Serialize)]
30pub struct UiNotification {
31    pub id: SessionId,
32    pub reaction_key: String,
33    pub action: ReactionAction,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub message: Option<String>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub priority: Option<String>,
38}
39
40#[derive(Debug, Clone, Serialize)]
41#[serde(tag = "type", rename_all = "snake_case")]
42pub enum OrchestratorEvent {
43    /// A session was created after the lifecycle loop was already running.
44    /// The loop decides "new" by comparing `session.created_at` against its
45    /// own startup timestamp — a session observed on the first tick whose
46    /// `created_at` predates startup is reported via `SessionRestored`
47    /// instead, so `watch` output distinguishes "brand new spawn" from
48    /// "restored from disk".
49    Spawned { id: SessionId, project_id: String },
50
51    /// A session that already existed on disk was observed by the
52    /// lifecycle loop on its first tick after startup. Emitted at most
53    /// once per session and only during the first tick — subsequent
54    /// appearances use `Spawned`. Consumers use this to suppress the
55    /// "N sessions just spawned" flood on reconnect.
56    SessionRestored {
57        id: SessionId,
58        project_id: String,
59        /// On-disk status at the moment of observation. Useful for UI
60        /// filtering (e.g. skip terminal sessions) without an extra
61        /// snapshot round-trip.
62        status: SessionStatus,
63    },
64
65    /// Lifecycle-driven status transition. `from == to` is never emitted.
66    StatusChanged {
67        id: SessionId,
68        from: SessionStatus,
69        to: SessionStatus,
70    },
71
72    /// Polled activity changed. `prev` is `None` on the first successful poll.
73    ActivityChanged {
74        id: SessionId,
75        prev: Option<ActivityState>,
76        next: ActivityState,
77    },
78
79    /// Runtime process is gone. Emitted exactly once per session.
80    Terminated {
81        id: SessionId,
82        reason: TerminationReason,
83    },
84
85    /// Polling-loop error for one session. The loop itself keeps running.
86    TickError { id: SessionId, message: String },
87
88    /// A configured reaction successfully ran its action. The engine emits
89    /// this on every successful dispatch — subscribers use it to surface
90    /// "ao-rs just fired X" in the CLI and for assertions in tests.
91    ///
92    /// `action` is the action the engine *actually* took, which may differ
93    /// from the configured action if the engine escalated mid-flight
94    /// (`SendToAgent` → `Notify`). Pair with `ReactionEscalated` to tell
95    /// first-time successes apart from escalations.
96    ReactionTriggered {
97        id: SessionId,
98        /// Reaction key from config (e.g. `"ci-failed"`).
99        reaction_key: String,
100        /// Action the engine actually executed this attempt.
101        action: ReactionAction,
102    },
103
104    /// The retry budget for a reaction was exhausted and the engine fell
105    /// back to `Notify`. Emitted *in addition to* the `ReactionTriggered`
106    /// that represents the escalated notify — so subscribers that only
107    /// care about "something was escalated" can filter on this event
108    /// alone without having to join on attempts counts.
109    ReactionEscalated {
110        id: SessionId,
111        reaction_key: String,
112        /// How many attempts had been made when escalation was decided.
113        /// The value is the attempt count *that triggered* escalation,
114        /// not `retries + 1`, so a user reading logs sees exactly how
115        /// many times the agent was poked before the notify fell through.
116        attempts: u32,
117    },
118
119    /// UI-friendly notification event (dashboard/desktop toasts).
120    ///
121    /// Emitted in addition to `ReactionTriggered` for reactions that should
122    /// surface to users in real time.
123    UiNotification { notification: UiNotification },
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
127#[serde(rename_all = "snake_case")]
128pub enum TerminationReason {
129    /// `Runtime::is_alive` returned false.
130    RuntimeGone,
131    /// The agent plugin reported `ActivityState::Exited`.
132    AgentExited,
133    /// Session had no runtime_handle to probe (e.g. crashed before create).
134    NoHandle,
135}
136
137impl TerminationReason {
138    pub const fn as_str(self) -> &'static str {
139        match self {
140            Self::RuntimeGone => "runtime_gone",
141            Self::AgentExited => "agent_exited",
142            Self::NoHandle => "no_handle",
143        }
144    }
145}
146
147impl std::fmt::Display for TerminationReason {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.write_str(self.as_str())
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    //! Serde tag checks — the wire form is public (SSE / logs), so a rename
156    //! is a breaking change. These tests pin every variant's `type` tag.
157
158    use super::*;
159    use serde_json::{json, Value};
160
161    fn tag_of(ev: &OrchestratorEvent) -> String {
162        let v: Value = serde_json::to_value(ev).unwrap();
163        v.get("type")
164            .and_then(Value::as_str)
165            .expect("event missing `type` tag")
166            .to_string()
167    }
168
169    fn sid(s: &str) -> SessionId {
170        SessionId(s.into())
171    }
172
173    #[test]
174    fn every_variant_has_expected_tag() {
175        let cases: &[(&str, OrchestratorEvent)] = &[
176            (
177                "spawned",
178                OrchestratorEvent::Spawned {
179                    id: sid("s1"),
180                    project_id: "demo".into(),
181                },
182            ),
183            (
184                "session_restored",
185                OrchestratorEvent::SessionRestored {
186                    id: sid("s1"),
187                    project_id: "demo".into(),
188                    status: SessionStatus::Spawning,
189                },
190            ),
191            (
192                "status_changed",
193                OrchestratorEvent::StatusChanged {
194                    id: sid("s1"),
195                    from: SessionStatus::Spawning,
196                    to: SessionStatus::Working,
197                },
198            ),
199            (
200                "activity_changed",
201                OrchestratorEvent::ActivityChanged {
202                    id: sid("s1"),
203                    prev: None,
204                    next: ActivityState::Ready,
205                },
206            ),
207            (
208                "terminated",
209                OrchestratorEvent::Terminated {
210                    id: sid("s1"),
211                    reason: TerminationReason::AgentExited,
212                },
213            ),
214            (
215                "tick_error",
216                OrchestratorEvent::TickError {
217                    id: sid("s1"),
218                    message: "boom".into(),
219                },
220            ),
221            (
222                "reaction_triggered",
223                OrchestratorEvent::ReactionTriggered {
224                    id: sid("s1"),
225                    reaction_key: "ci-failed".into(),
226                    action: ReactionAction::Notify,
227                },
228            ),
229            (
230                "reaction_escalated",
231                OrchestratorEvent::ReactionEscalated {
232                    id: sid("s1"),
233                    reaction_key: "ci-failed".into(),
234                    attempts: 3,
235                },
236            ),
237            (
238                "ui_notification",
239                OrchestratorEvent::UiNotification {
240                    notification: UiNotification {
241                        id: sid("s1"),
242                        reaction_key: "ci-failed".into(),
243                        action: ReactionAction::Notify,
244                        message: None,
245                        priority: None,
246                    },
247                },
248            ),
249        ];
250
251        for (expected, ev) in cases {
252            assert_eq!(&tag_of(ev), expected, "wrong tag for {ev:?}");
253        }
254    }
255
256    #[test]
257    fn session_restored_carries_status_field() {
258        let ev = OrchestratorEvent::SessionRestored {
259            id: sid("s1"),
260            project_id: "demo".into(),
261            status: SessionStatus::Working,
262        };
263        let v: Value = serde_json::to_value(&ev).unwrap();
264        assert_eq!(
265            v,
266            json!({
267                "type": "session_restored",
268                "id": "s1",
269                "project_id": "demo",
270                "status": "working",
271            })
272        );
273    }
274
275    #[test]
276    fn termination_reason_wire_form_is_snake_case() {
277        assert_eq!(
278            serde_json::to_value(TerminationReason::RuntimeGone).unwrap(),
279            json!("runtime_gone")
280        );
281        assert_eq!(
282            serde_json::to_value(TerminationReason::AgentExited).unwrap(),
283            json!("agent_exited")
284        );
285        assert_eq!(
286            serde_json::to_value(TerminationReason::NoHandle).unwrap(),
287            json!("no_handle")
288        );
289    }
290}