algocline_core/execution/progress.rs
1//! Progress observation types for the `ExecutionService` layer.
2
3use serde::{Deserialize, Serialize};
4
5use super::error::ObserverRecvError;
6use super::pause::{PauseInfo, PauseKind};
7use super::state::ExecutionStateTag;
8use crate::TokenUsage;
9
10/// An event emitted on the per-session broadcast channel.
11///
12/// All variants carry an `at` field (Unix timestamp in milliseconds).
13/// The channel is emitted from [`crate::execution::ExecutionService`] implementations
14/// inside the engine layer; multiple independent observers may subscribe via
15/// [`crate::execution::ExecutionService::observe`] without affecting one another
16/// (crux: Sink-free broadcast Progress fan-out).
17///
18/// # Serde
19/// Uses `#[serde(tag = "kind", rename_all = "snake_case")]` internally tagged
20/// representation, so JSON consumers see `{"kind": "state_transition", ...}`.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum ProgressEvent {
24 /// The session transitioned from one state to another.
25 StateTransition {
26 /// The previous state tag.
27 from: ExecutionStateTag,
28 /// The new state tag.
29 to: ExecutionStateTag,
30 /// Unix timestamp (milliseconds) of the transition.
31 at: i64,
32 },
33 /// The session entered a paused state and is waiting for LLM responses.
34 PauseRequested {
35 /// Details of the pause (pending prompts, kind, timestamp).
36 info: PauseInfo,
37 /// Unix timestamp (milliseconds) when the pause was requested.
38 at: i64,
39 },
40 /// A `resume` call was accepted and execution is continuing.
41 ResumeAccepted {
42 /// The pause kind (single or batch) that was resolved.
43 payload_kind: PauseKind,
44 /// Unix timestamp (milliseconds) when the resume was accepted.
45 at: i64,
46 },
47 /// A free-form diagnostic note from the engine.
48 Note {
49 /// Optional short title for the note.
50 title: Option<String>,
51 /// Note body.
52 content: String,
53 /// Unix timestamp (milliseconds) of the note.
54 at: i64,
55 },
56 /// An LLM call was dispatched for the given query.
57 LlmCallBegin {
58 /// The query identifier.
59 query_id: String,
60 /// Unix timestamp (milliseconds) when the call began.
61 at: i64,
62 },
63 /// An LLM call completed and a response was received.
64 LlmCallEnd {
65 /// The query identifier.
66 query_id: String,
67 /// Token usage for this call, if reported.
68 usage: Option<TokenUsage>,
69 /// Unix timestamp (milliseconds) when the call ended.
70 at: i64,
71 },
72 /// A periodic heartbeat tick from the execution loop.
73 Tick {
74 /// A short label describing the current execution phase.
75 phase: String,
76 /// Unix timestamp (milliseconds) of the tick.
77 at: i64,
78 },
79}
80
81/// Handle returned by [`crate::execution::ExecutionService::observe`].
82///
83/// Wraps a `tokio::sync::broadcast::Receiver<ProgressEvent>` (provided by the engine
84/// layer, Subtask 2). Multiple handles may exist concurrently; each receives the full
85/// event stream independently. When the session's broadcast sender is dropped (session
86/// terminates), `recv()` returns `Err(ObserverRecvError::Closed)`.
87///
88/// This is a **trait** to allow the engine layer (Subtask 2) to provide a concrete
89/// `BroadcastObserverHandle` struct without a circular dependency on core.
90pub trait ObserverHandle: Send {
91 /// Receive the next event, waiting asynchronously.
92 ///
93 /// Returns `Err(ObserverRecvError::Closed)` when the broadcast sender is dropped
94 /// (session terminated). Returns `Err(ObserverRecvError::Lagged(n))` when the
95 /// receiver fell behind and `n` events were skipped; subsequent calls continue
96 /// from the latest available event.
97 fn recv(
98 &mut self,
99 ) -> std::pin::Pin<
100 Box<dyn std::future::Future<Output = Result<ProgressEvent, ObserverRecvError>> + Send + '_>,
101 >;
102
103 /// Non-blocking receive. Returns an event if one is immediately available,
104 /// `Err(ObserverRecvError::Closed)` if the sender is gone, or
105 /// `Err(ObserverRecvError::Lagged(n))` on a lag event.
106 ///
107 /// Callers should treat `Err` variants consistently with [`Self::recv`].
108 fn try_recv(&mut self) -> Result<ProgressEvent, ObserverRecvError>;
109
110 /// Consume and close this handle. After calling `close` the handle is dropped
111 /// and no further events can be received.
112 fn close(self: Box<Self>);
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 #[test]
120 fn progress_event_serde_tagged_kind() {
121 // StateTransition must serialize as {"kind": "state_transition", ...}
122 let event = ProgressEvent::StateTransition {
123 from: ExecutionStateTag::Running,
124 to: ExecutionStateTag::Paused,
125 at: 1_700_000_000_000,
126 };
127 let json = serde_json::to_string(&event).expect("serialize");
128 assert!(
129 json.contains(r#""kind":"state_transition""#),
130 "expected tagged kind in JSON, got: {json}"
131 );
132 let roundtripped: ProgressEvent = serde_json::from_str(&json).expect("deserialize");
133 // Verify roundtrip by re-serializing and comparing
134 let json2 = serde_json::to_string(&roundtripped).expect("re-serialize");
135 assert_eq!(json, json2);
136 }
137
138 #[test]
139 fn all_progress_event_variants_serde() {
140 use crate::execution::pause::{PauseInfo, PauseKind};
141
142 let events: Vec<ProgressEvent> = vec![
143 ProgressEvent::StateTransition {
144 from: ExecutionStateTag::Running,
145 to: ExecutionStateTag::Done,
146 at: 0,
147 },
148 ProgressEvent::PauseRequested {
149 info: PauseInfo {
150 kind: PauseKind::Single,
151 prompts: vec![],
152 paused_at: 0,
153 },
154 at: 0,
155 },
156 ProgressEvent::ResumeAccepted {
157 payload_kind: PauseKind::Batch,
158 at: 0,
159 },
160 ProgressEvent::Note {
161 title: Some("test".into()),
162 content: "hello".into(),
163 at: 0,
164 },
165 ProgressEvent::LlmCallBegin {
166 query_id: "q1".into(),
167 at: 0,
168 },
169 ProgressEvent::LlmCallEnd {
170 query_id: "q1".into(),
171 usage: None,
172 at: 0,
173 },
174 ProgressEvent::Tick {
175 phase: "running".into(),
176 at: 0,
177 },
178 ];
179
180 for event in events {
181 let json = serde_json::to_string(&event).expect("serialize");
182 let _: ProgressEvent = serde_json::from_str(&json).expect("deserialize");
183 }
184 }
185}