Skip to main content

codex_runtime/runtime/
sink.rs

1use std::future::Future;
2use std::path::Path;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tokio::fs::{File, OpenOptions};
7use tokio::io::AsyncWriteExt;
8use tokio::sync::Mutex;
9
10use crate::runtime::core::io_policy::should_flush_after_n_events;
11use crate::runtime::errors::SinkError;
12use crate::runtime::events::Envelope;
13
14pub type EventSinkFuture<'a> = Pin<Box<dyn Future<Output = Result<(), SinkError>> + Send + 'a>>;
15
16const DEFAULT_EVENTS_PER_FLUSH: u64 = 64;
17
18/// Optional event persistence/export hook.
19/// Implementations should avoid panics and return `SinkError` on write failures.
20pub trait EventSink: Send + Sync + 'static {
21    /// Consume one envelope.
22    /// Side effects: sink-specific I/O. Complexity depends on implementation.
23    fn on_envelope<'a>(&'a self, envelope: &'a Envelope) -> EventSinkFuture<'a>;
24}
25
26#[derive(Debug)]
27pub struct JsonlFileSink {
28    state: Arc<Mutex<JsonlFileSinkState>>,
29}
30
31#[derive(Debug)]
32struct JsonlFileSinkState {
33    file: File,
34    pending_writes: u64,
35    flush_policy: JsonlFlushPolicy,
36}
37
38/// Durability/throughput tradeoff for JSONL sink flushing.
39/// - `EveryEvent`: flush each event write (lowest data-at-risk, highest overhead).
40/// - `EveryNEvents`: flush after N writes (higher throughput, up to N-1 events buffered in process).
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum JsonlFlushPolicy {
43    EveryEvent,
44    EveryNEvents { events: u64 },
45}
46
47impl Default for JsonlFlushPolicy {
48    fn default() -> Self {
49        Self::EveryNEvents {
50            events: DEFAULT_EVENTS_PER_FLUSH,
51        }
52    }
53}
54
55impl JsonlFileSink {
56    /// Open or create JSONL sink file in append mode.
57    /// Side effects: filesystem open/create. Complexity: O(1).
58    pub async fn open(path: impl AsRef<Path>) -> Result<Self, SinkError> {
59        Self::open_with_policy(path, JsonlFlushPolicy::default()).await
60    }
61
62    /// Open JSONL sink with explicit flush policy.
63    /// Side effects: filesystem open/create. Complexity: O(1).
64    pub async fn open_with_policy(
65        path: impl AsRef<Path>,
66        flush_policy: JsonlFlushPolicy,
67    ) -> Result<Self, SinkError> {
68        let file = OpenOptions::new()
69            .create(true)
70            .append(true)
71            .open(path.as_ref())
72            .await
73            .map_err(|err| SinkError::Io(err.to_string()))?;
74        Ok(Self {
75            state: Arc::new(Mutex::new(JsonlFileSinkState {
76                file,
77                pending_writes: 0,
78                flush_policy,
79            })),
80        })
81    }
82
83    #[cfg(test)]
84    async fn debug_pending_writes(&self) -> u64 {
85        self.state.lock().await.pending_writes
86    }
87}
88
89impl EventSink for JsonlFileSink {
90    /// Serialize one envelope and append a trailing newline.
91    /// Allocation: one JSON byte vector. Complexity: O(n), n = serialized envelope bytes.
92    fn on_envelope<'a>(&'a self, envelope: &'a Envelope) -> EventSinkFuture<'a> {
93        Box::pin(async move {
94            let mut bytes = serde_json::to_vec(envelope)
95                .map_err(|err| SinkError::Serialize(err.to_string()))?;
96            bytes.push(b'\n');
97
98            let mut state = self.state.lock().await;
99            state
100                .file
101                .write_all(&bytes)
102                .await
103                .map_err(|err| SinkError::Io(err.to_string()))?;
104            state.pending_writes = state.pending_writes.saturating_add(1);
105
106            if should_flush(state.flush_policy, state.pending_writes) {
107                state
108                    .file
109                    .flush()
110                    .await
111                    .map_err(|err| SinkError::Io(err.to_string()))?;
112                state.pending_writes = 0;
113            }
114            Ok(())
115        })
116    }
117}
118
119fn should_flush(policy: JsonlFlushPolicy, pending_writes: u64) -> bool {
120    match policy {
121        JsonlFlushPolicy::EveryEvent => true,
122        JsonlFlushPolicy::EveryNEvents { events } => {
123            should_flush_after_n_events(pending_writes, events)
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use std::fs;
131    use std::time::{SystemTime, UNIX_EPOCH};
132
133    use serde_json::json;
134
135    use super::*;
136    use crate::runtime::events::{Direction, MsgKind};
137
138    fn temp_file_path() -> std::path::PathBuf {
139        let nanos = SystemTime::now()
140            .duration_since(UNIX_EPOCH)
141            .expect("clock")
142            .as_nanos();
143        std::env::temp_dir().join(format!("runtime_sink_{nanos}.jsonl"))
144    }
145
146    #[tokio::test(flavor = "current_thread")]
147    async fn jsonl_file_sink_writes_one_line_per_envelope() {
148        let path = temp_file_path();
149        let sink = JsonlFileSink::open_with_policy(&path, JsonlFlushPolicy::EveryEvent)
150            .await
151            .expect("open sink");
152
153        let envelope = Envelope {
154            seq: 1,
155            ts_millis: 0,
156            direction: Direction::Inbound,
157            kind: MsgKind::Notification,
158            rpc_id: None,
159            method: Some(Arc::from("turn/started")),
160            thread_id: Some(Arc::from("thr_1")),
161            turn_id: Some(Arc::from("turn_1")),
162            item_id: None,
163            json: Arc::new(
164                json!({"method":"turn/started","params":{"threadId":"thr_1","turnId":"turn_1"}}),
165            ),
166        };
167
168        sink.on_envelope(&envelope).await.expect("write envelope");
169
170        let contents = fs::read_to_string(&path).expect("read sink file");
171        let line = contents.trim_end();
172        assert!(!line.is_empty(), "sink line must not be empty");
173        let parsed: Envelope = serde_json::from_str(line).expect("valid envelope json");
174        assert_eq!(parsed.seq, 1);
175        assert_eq!(parsed.method.as_deref(), Some("turn/started"));
176
177        let _ = fs::remove_file(path);
178    }
179
180    #[tokio::test(flavor = "current_thread")]
181    async fn jsonl_file_sink_batches_flush_by_event_count() {
182        let path = temp_file_path();
183        let sink =
184            JsonlFileSink::open_with_policy(&path, JsonlFlushPolicy::EveryNEvents { events: 2 })
185                .await
186                .expect("open sink");
187
188        let envelope = Envelope {
189            seq: 1,
190            ts_millis: 0,
191            direction: Direction::Inbound,
192            kind: MsgKind::Notification,
193            rpc_id: None,
194            method: Some(Arc::from("turn/started")),
195            thread_id: Some(Arc::from("thr_1")),
196            turn_id: Some(Arc::from("turn_1")),
197            item_id: None,
198            json: Arc::new(
199                json!({"method":"turn/started","params":{"threadId":"thr_1","turnId":"turn_1"}}),
200            ),
201        };
202
203        sink.on_envelope(&envelope).await.expect("write #1");
204        assert_eq!(sink.debug_pending_writes().await, 1);
205
206        sink.on_envelope(&envelope).await.expect("write #2");
207        assert_eq!(sink.debug_pending_writes().await, 0);
208
209        let _ = fs::remove_file(path);
210    }
211
212    #[test]
213    fn default_flush_policy_is_batched() {
214        assert_eq!(
215            JsonlFlushPolicy::default(),
216            JsonlFlushPolicy::EveryNEvents {
217                events: DEFAULT_EVENTS_PER_FLUSH
218            }
219        );
220    }
221}