Skip to main content

algocline_core/execution/
progress.rs

1//! Progress observation types for the `ExecutionService` layer.
2
3use serde::{Deserialize, Serialize};
4
5use super::error::ObserverRecvError;
6use super::pause::{PauseInfo, PauseKind};
7use super::state::ExecutionStateTag;
8use crate::TokenUsage;
9
10/// An event emitted on the per-session broadcast channel.
11///
12/// All variants carry an `at` field (Unix timestamp in milliseconds).
13/// The channel is emitted from [`crate::execution::ExecutionService`] implementations
14/// inside the engine layer; multiple independent observers may subscribe via
15/// [`crate::execution::ExecutionService::observe`] without affecting one another
16/// (crux: Sink-free broadcast Progress fan-out).
17///
18/// # Serde
19/// Uses `#[serde(tag = "kind", rename_all = "snake_case")]` internally tagged
20/// representation, so JSON consumers see `{"kind": "state_transition", ...}`.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum ProgressEvent {
24    /// The session transitioned from one state to another.
25    StateTransition {
26        /// The previous state tag.
27        from: ExecutionStateTag,
28        /// The new state tag.
29        to: ExecutionStateTag,
30        /// Unix timestamp (milliseconds) of the transition.
31        at: i64,
32    },
33    /// The session entered a paused state and is waiting for LLM responses.
34    PauseRequested {
35        /// Details of the pause (pending prompts, kind, timestamp).
36        info: PauseInfo,
37        /// Unix timestamp (milliseconds) when the pause was requested.
38        at: i64,
39    },
40    /// A `resume` call was accepted and execution is continuing.
41    ResumeAccepted {
42        /// The pause kind (single or batch) that was resolved.
43        payload_kind: PauseKind,
44        /// Unix timestamp (milliseconds) when the resume was accepted.
45        at: i64,
46    },
47    /// A free-form diagnostic note from the engine.
48    Note {
49        /// Optional short title for the note.
50        title: Option<String>,
51        /// Note body.
52        content: String,
53        /// Unix timestamp (milliseconds) of the note.
54        at: i64,
55    },
56    /// An LLM call was dispatched for the given query.
57    LlmCallBegin {
58        /// The query identifier.
59        query_id: String,
60        /// Unix timestamp (milliseconds) when the call began.
61        at: i64,
62    },
63    /// An LLM call completed and a response was received.
64    LlmCallEnd {
65        /// The query identifier.
66        query_id: String,
67        /// Token usage for this call, if reported.
68        usage: Option<TokenUsage>,
69        /// Unix timestamp (milliseconds) when the call ended.
70        at: i64,
71    },
72    /// A periodic heartbeat tick from the execution loop.
73    Tick {
74        /// A short label describing the current execution phase.
75        phase: String,
76        /// Unix timestamp (milliseconds) of the tick.
77        at: i64,
78    },
79}
80
81/// Handle returned by [`crate::execution::ExecutionService::observe`].
82///
83/// Wraps a `tokio::sync::broadcast::Receiver<ProgressEvent>` (provided by the engine
84/// layer, Subtask 2).  Multiple handles may exist concurrently; each receives the full
85/// event stream independently.  When the session's broadcast sender is dropped (session
86/// terminates), `recv()` returns `Err(ObserverRecvError::Closed)`.
87///
88/// This is a **trait** to allow the engine layer (Subtask 2) to provide a concrete
89/// `BroadcastObserverHandle` struct without a circular dependency on core.
90pub trait ObserverHandle: Send {
91    /// Receive the next event, waiting asynchronously.
92    ///
93    /// Returns `Err(ObserverRecvError::Closed)` when the broadcast sender is dropped
94    /// (session terminated).  Returns `Err(ObserverRecvError::Lagged(n))` when the
95    /// receiver fell behind and `n` events were skipped; subsequent calls continue
96    /// from the latest available event.
97    fn recv(
98        &mut self,
99    ) -> std::pin::Pin<
100        Box<dyn std::future::Future<Output = Result<ProgressEvent, ObserverRecvError>> + Send + '_>,
101    >;
102
103    /// Non-blocking receive.  Returns an event if one is immediately available,
104    /// `Err(ObserverRecvError::Closed)` if the sender is gone, or
105    /// `Err(ObserverRecvError::Lagged(n))` on a lag event.
106    ///
107    /// Callers should treat `Err` variants consistently with [`Self::recv`].
108    fn try_recv(&mut self) -> Result<ProgressEvent, ObserverRecvError>;
109
110    /// Consume and close this handle.  After calling `close` the handle is dropped
111    /// and no further events can be received.
112    fn close(self: Box<Self>);
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    #[test]
120    fn progress_event_serde_tagged_kind() {
121        // StateTransition must serialize as {"kind": "state_transition", ...}
122        let event = ProgressEvent::StateTransition {
123            from: ExecutionStateTag::Running,
124            to: ExecutionStateTag::Paused,
125            at: 1_700_000_000_000,
126        };
127        let json = serde_json::to_string(&event).expect("serialize");
128        assert!(
129            json.contains(r#""kind":"state_transition""#),
130            "expected tagged kind in JSON, got: {json}"
131        );
132        let roundtripped: ProgressEvent = serde_json::from_str(&json).expect("deserialize");
133        // Verify roundtrip by re-serializing and comparing
134        let json2 = serde_json::to_string(&roundtripped).expect("re-serialize");
135        assert_eq!(json, json2);
136    }
137
138    #[test]
139    fn all_progress_event_variants_serde() {
140        use crate::execution::pause::{PauseInfo, PauseKind};
141
142        let events: Vec<ProgressEvent> = vec![
143            ProgressEvent::StateTransition {
144                from: ExecutionStateTag::Running,
145                to: ExecutionStateTag::Done,
146                at: 0,
147            },
148            ProgressEvent::PauseRequested {
149                info: PauseInfo {
150                    kind: PauseKind::Single,
151                    prompts: vec![],
152                    paused_at: 0,
153                },
154                at: 0,
155            },
156            ProgressEvent::ResumeAccepted {
157                payload_kind: PauseKind::Batch,
158                at: 0,
159            },
160            ProgressEvent::Note {
161                title: Some("test".into()),
162                content: "hello".into(),
163                at: 0,
164            },
165            ProgressEvent::LlmCallBegin {
166                query_id: "q1".into(),
167                at: 0,
168            },
169            ProgressEvent::LlmCallEnd {
170                query_id: "q1".into(),
171                usage: None,
172                at: 0,
173            },
174            ProgressEvent::Tick {
175                phase: "running".into(),
176                at: 0,
177            },
178        ];
179
180        for event in events {
181            let json = serde_json::to_string(&event).expect("serialize");
182            let _: ProgressEvent = serde_json::from_str(&json).expect("deserialize");
183        }
184    }
185}