1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
//! Events broadcast by the `LifecycleManager` to anyone watching the
//! session fleet — the CLI's `ao-rs watch`, future reaction engines,
//! future notifier plugins, an eventual SSE API.
//!
//! **All variants must be `Clone`** because they ride on
//! `tokio::sync::broadcast`, which fans a single send out to every
//! subscriber by cloning.
//!
//! We keep the event surface intentionally small for Phase C:
//! - `Spawned` when a brand-new session is observed for the first time
//! - `SessionRestored` when a session that already existed on disk is
//! observed on the loop's first tick — separate from `Spawned` so
//! `watch` and dashboard consumers don't mislabel pre-existing
//! sessions as new
//! - `StatusChanged` when lifecycle transitions a session between
//! `SessionStatus` variants
//! - `ActivityChanged` when the polled `ActivityState` changes
//! - `Terminated` when the runtime is no longer alive — separate from
//! `StatusChanged` because subscribers often want to react to *dead*
//! specifically (e.g. start cleanup)
//! - `TickError` surfaces polling-loop errors without killing the loop
use crate::{
reactions::ReactionAction,
types::{ActivityState, SessionId, SessionStatus},
};
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
pub struct UiNotification {
pub id: SessionId,
pub reaction_key: String,
pub action: ReactionAction,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum OrchestratorEvent {
/// A session was created after the lifecycle loop was already running.
/// The loop decides "new" by comparing `session.created_at` against its
/// own startup timestamp — a session observed on the first tick whose
/// `created_at` predates startup is reported via `SessionRestored`
/// instead, so `watch` output distinguishes "brand new spawn" from
/// "restored from disk".
Spawned { id: SessionId, project_id: String },
/// A session that already existed on disk was observed by the
/// lifecycle loop on its first tick after startup. Emitted at most
/// once per session and only during the first tick — subsequent
/// appearances use `Spawned`. Consumers use this to suppress the
/// "N sessions just spawned" flood on reconnect.
SessionRestored {
id: SessionId,
project_id: String,
/// On-disk status at the moment of observation. Useful for UI
/// filtering (e.g. skip terminal sessions) without an extra
/// snapshot round-trip.
status: SessionStatus,
},
/// Lifecycle-driven status transition. `from == to` is never emitted.
StatusChanged {
id: SessionId,
from: SessionStatus,
to: SessionStatus,
},
/// Polled activity changed. `prev` is `None` on the first successful poll.
ActivityChanged {
id: SessionId,
prev: Option<ActivityState>,
next: ActivityState,
},
/// Runtime process is gone. Emitted exactly once per session.
Terminated {
id: SessionId,
reason: TerminationReason,
},
/// Polling-loop error for one session. The loop itself keeps running.
TickError { id: SessionId, message: String },
/// A configured reaction successfully ran its action. The engine emits
/// this on every successful dispatch — subscribers use it to surface
/// "ao-rs just fired X" in the CLI and for assertions in tests.
///
/// `action` is the action the engine *actually* took, which may differ
/// from the configured action if the engine escalated mid-flight
/// (`SendToAgent` → `Notify`). Pair with `ReactionEscalated` to tell
/// first-time successes apart from escalations.
ReactionTriggered {
id: SessionId,
/// Reaction key from config (e.g. `"ci-failed"`).
reaction_key: String,
/// Action the engine actually executed this attempt.
action: ReactionAction,
},
/// The retry budget for a reaction was exhausted and the engine fell
/// back to `Notify`. Emitted *in addition to* the `ReactionTriggered`
/// that represents the escalated notify — so subscribers that only
/// care about "something was escalated" can filter on this event
/// alone without having to join on attempts counts.
ReactionEscalated {
id: SessionId,
reaction_key: String,
/// How many attempts had been made when escalation was decided.
/// The value is the attempt count *that triggered* escalation,
/// not `retries + 1`, so a user reading logs sees exactly how
/// many times the agent was poked before the notify fell through.
attempts: u32,
},
/// UI-friendly notification event (dashboard/desktop toasts).
///
/// Emitted in addition to `ReactionTriggered` for reactions that should
/// surface to users in real time.
UiNotification { notification: UiNotification },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TerminationReason {
/// `Runtime::is_alive` returned false.
RuntimeGone,
/// The agent plugin reported `ActivityState::Exited`.
AgentExited,
/// Session had no runtime_handle to probe (e.g. crashed before create).
NoHandle,
}
impl TerminationReason {
pub const fn as_str(self) -> &'static str {
match self {
Self::RuntimeGone => "runtime_gone",
Self::AgentExited => "agent_exited",
Self::NoHandle => "no_handle",
}
}
}
impl std::fmt::Display for TerminationReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[cfg(test)]
mod tests {
//! Serde tag checks — the wire form is public (SSE / logs), so a rename
//! is a breaking change. These tests pin every variant's `type` tag.
use super::*;
use serde_json::{json, Value};
fn tag_of(ev: &OrchestratorEvent) -> String {
let v: Value = serde_json::to_value(ev).unwrap();
v.get("type")
.and_then(Value::as_str)
.expect("event missing `type` tag")
.to_string()
}
fn sid(s: &str) -> SessionId {
SessionId(s.into())
}
#[test]
fn every_variant_has_expected_tag() {
let cases: &[(&str, OrchestratorEvent)] = &[
(
"spawned",
OrchestratorEvent::Spawned {
id: sid("s1"),
project_id: "demo".into(),
},
),
(
"session_restored",
OrchestratorEvent::SessionRestored {
id: sid("s1"),
project_id: "demo".into(),
status: SessionStatus::Spawning,
},
),
(
"status_changed",
OrchestratorEvent::StatusChanged {
id: sid("s1"),
from: SessionStatus::Spawning,
to: SessionStatus::Working,
},
),
(
"activity_changed",
OrchestratorEvent::ActivityChanged {
id: sid("s1"),
prev: None,
next: ActivityState::Ready,
},
),
(
"terminated",
OrchestratorEvent::Terminated {
id: sid("s1"),
reason: TerminationReason::AgentExited,
},
),
(
"tick_error",
OrchestratorEvent::TickError {
id: sid("s1"),
message: "boom".into(),
},
),
(
"reaction_triggered",
OrchestratorEvent::ReactionTriggered {
id: sid("s1"),
reaction_key: "ci-failed".into(),
action: ReactionAction::Notify,
},
),
(
"reaction_escalated",
OrchestratorEvent::ReactionEscalated {
id: sid("s1"),
reaction_key: "ci-failed".into(),
attempts: 3,
},
),
(
"ui_notification",
OrchestratorEvent::UiNotification {
notification: UiNotification {
id: sid("s1"),
reaction_key: "ci-failed".into(),
action: ReactionAction::Notify,
message: None,
priority: None,
},
},
),
];
for (expected, ev) in cases {
assert_eq!(&tag_of(ev), expected, "wrong tag for {ev:?}");
}
}
#[test]
fn session_restored_carries_status_field() {
let ev = OrchestratorEvent::SessionRestored {
id: sid("s1"),
project_id: "demo".into(),
status: SessionStatus::Working,
};
let v: Value = serde_json::to_value(&ev).unwrap();
assert_eq!(
v,
json!({
"type": "session_restored",
"id": "s1",
"project_id": "demo",
"status": "working",
})
);
}
#[test]
fn termination_reason_wire_form_is_snake_case() {
assert_eq!(
serde_json::to_value(TerminationReason::RuntimeGone).unwrap(),
json!("runtime_gone")
);
assert_eq!(
serde_json::to_value(TerminationReason::AgentExited).unwrap(),
json!("agent_exited")
);
assert_eq!(
serde_json::to_value(TerminationReason::NoHandle).unwrap(),
json!("no_handle")
);
}
}