codex_runtime/runtime/
sink.rs1use std::future::Future;
2use std::path::Path;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tokio::fs::{File, OpenOptions};
7use tokio::io::AsyncWriteExt;
8use tokio::sync::Mutex;
9
10use crate::runtime::core::io_policy::should_flush_after_n_events;
11use crate::runtime::errors::SinkError;
12use crate::runtime::events::Envelope;
13
14pub type EventSinkFuture<'a> = Pin<Box<dyn Future<Output = Result<(), SinkError>> + Send + 'a>>;
15
16const DEFAULT_EVENTS_PER_FLUSH: u64 = 64;
17
18pub trait EventSink: Send + Sync + 'static {
21 fn on_envelope<'a>(&'a self, envelope: &'a Envelope) -> EventSinkFuture<'a>;
24}
25
26#[derive(Debug)]
27pub struct JsonlFileSink {
28 state: Arc<Mutex<JsonlFileSinkState>>,
29}
30
31#[derive(Debug)]
32struct JsonlFileSinkState {
33 file: File,
34 pending_writes: u64,
35 flush_policy: JsonlFlushPolicy,
36}
37
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum JsonlFlushPolicy {
43 EveryEvent,
44 EveryNEvents { events: u64 },
45}
46
47impl Default for JsonlFlushPolicy {
48 fn default() -> Self {
49 Self::EveryNEvents {
50 events: DEFAULT_EVENTS_PER_FLUSH,
51 }
52 }
53}
54
55impl JsonlFileSink {
56 pub async fn open(path: impl AsRef<Path>) -> Result<Self, SinkError> {
59 Self::open_with_policy(path, JsonlFlushPolicy::default()).await
60 }
61
62 pub async fn open_with_policy(
65 path: impl AsRef<Path>,
66 flush_policy: JsonlFlushPolicy,
67 ) -> Result<Self, SinkError> {
68 let file = OpenOptions::new()
69 .create(true)
70 .append(true)
71 .open(path.as_ref())
72 .await
73 .map_err(|err| SinkError::Io(err.to_string()))?;
74 Ok(Self {
75 state: Arc::new(Mutex::new(JsonlFileSinkState {
76 file,
77 pending_writes: 0,
78 flush_policy,
79 })),
80 })
81 }
82
83 #[cfg(test)]
84 async fn debug_pending_writes(&self) -> u64 {
85 self.state.lock().await.pending_writes
86 }
87}
88
89impl EventSink for JsonlFileSink {
90 fn on_envelope<'a>(&'a self, envelope: &'a Envelope) -> EventSinkFuture<'a> {
93 Box::pin(async move {
94 let mut bytes = serde_json::to_vec(envelope)
95 .map_err(|err| SinkError::Serialize(err.to_string()))?;
96 bytes.push(b'\n');
97
98 let mut state = self.state.lock().await;
99 state
100 .file
101 .write_all(&bytes)
102 .await
103 .map_err(|err| SinkError::Io(err.to_string()))?;
104 state.pending_writes = state.pending_writes.saturating_add(1);
105
106 if should_flush(state.flush_policy, state.pending_writes) {
107 state
108 .file
109 .flush()
110 .await
111 .map_err(|err| SinkError::Io(err.to_string()))?;
112 state.pending_writes = 0;
113 }
114 Ok(())
115 })
116 }
117}
118
119fn should_flush(policy: JsonlFlushPolicy, pending_writes: u64) -> bool {
120 match policy {
121 JsonlFlushPolicy::EveryEvent => true,
122 JsonlFlushPolicy::EveryNEvents { events } => {
123 should_flush_after_n_events(pending_writes, events)
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use std::fs;
131 use std::time::{SystemTime, UNIX_EPOCH};
132
133 use serde_json::json;
134
135 use super::*;
136 use crate::runtime::events::{Direction, MsgKind};
137
138 fn temp_file_path() -> std::path::PathBuf {
139 let nanos = SystemTime::now()
140 .duration_since(UNIX_EPOCH)
141 .expect("clock")
142 .as_nanos();
143 std::env::temp_dir().join(format!("runtime_sink_{nanos}.jsonl"))
144 }
145
146 #[tokio::test(flavor = "current_thread")]
147 async fn jsonl_file_sink_writes_one_line_per_envelope() {
148 let path = temp_file_path();
149 let sink = JsonlFileSink::open_with_policy(&path, JsonlFlushPolicy::EveryEvent)
150 .await
151 .expect("open sink");
152
153 let envelope = Envelope {
154 seq: 1,
155 ts_millis: 0,
156 direction: Direction::Inbound,
157 kind: MsgKind::Notification,
158 rpc_id: None,
159 method: Some(Arc::from("turn/started")),
160 thread_id: Some(Arc::from("thr_1")),
161 turn_id: Some(Arc::from("turn_1")),
162 item_id: None,
163 json: Arc::new(
164 json!({"method":"turn/started","params":{"threadId":"thr_1","turnId":"turn_1"}}),
165 ),
166 };
167
168 sink.on_envelope(&envelope).await.expect("write envelope");
169
170 let contents = fs::read_to_string(&path).expect("read sink file");
171 let line = contents.trim_end();
172 assert!(!line.is_empty(), "sink line must not be empty");
173 let parsed: Envelope = serde_json::from_str(line).expect("valid envelope json");
174 assert_eq!(parsed.seq, 1);
175 assert_eq!(parsed.method.as_deref(), Some("turn/started"));
176
177 let _ = fs::remove_file(path);
178 }
179
180 #[tokio::test(flavor = "current_thread")]
181 async fn jsonl_file_sink_batches_flush_by_event_count() {
182 let path = temp_file_path();
183 let sink =
184 JsonlFileSink::open_with_policy(&path, JsonlFlushPolicy::EveryNEvents { events: 2 })
185 .await
186 .expect("open sink");
187
188 let envelope = Envelope {
189 seq: 1,
190 ts_millis: 0,
191 direction: Direction::Inbound,
192 kind: MsgKind::Notification,
193 rpc_id: None,
194 method: Some(Arc::from("turn/started")),
195 thread_id: Some(Arc::from("thr_1")),
196 turn_id: Some(Arc::from("turn_1")),
197 item_id: None,
198 json: Arc::new(
199 json!({"method":"turn/started","params":{"threadId":"thr_1","turnId":"turn_1"}}),
200 ),
201 };
202
203 sink.on_envelope(&envelope).await.expect("write #1");
204 assert_eq!(sink.debug_pending_writes().await, 1);
205
206 sink.on_envelope(&envelope).await.expect("write #2");
207 assert_eq!(sink.debug_pending_writes().await, 0);
208
209 let _ = fs::remove_file(path);
210 }
211
212 #[test]
213 fn default_flush_policy_is_batched() {
214 assert_eq!(
215 JsonlFlushPolicy::default(),
216 JsonlFlushPolicy::EveryNEvents {
217 events: DEFAULT_EVENTS_PER_FLUSH
218 }
219 );
220 }
221}