opi_agent/streaming_proxy.rs
1//! Streaming proxy for forwarding command/event JSONL streams (task 4.10).
2//!
3//! **Unstable 0.x API** — this module may change between minor versions without
4//! notice. Consumers MUST pin an exact version and test against upgrades.
5//!
6//! # Overview
7//!
8//! The [`StreamingProxy`] reads JSONL commands from any reader, dispatches them
9//! to a [`ProxyHandler`], and writes JSONL responses/events to any writer. It
10//! bridges the SDK command model ([`SdkCommand`]/[`SdkResponse`]) with an
11//! external transport without requiring a live provider or specific I/O backend.
12//!
13//! # Framing
14//!
15//! Strict JSONL: one JSON object per line, `\n` delimiter, flushed after each
16//! write. Empty lines are silently skipped. Malformed JSON produces a
17//! `proxy_error` response with the line number and parse error.
18//!
19//! # Backpressure
20//!
21//! Events emitted by the handler are buffered in a bounded synchronous channel
22//! (configurable via [`ProxyConfig::event_channel_capacity`], default 256).
23//! When the buffer is full, new events are dropped with a tracing warning.
24//! This prevents a slow consumer from blocking the handler.
25//!
26//! # Cancellation
27//!
28//! The proxy accepts a [`tokio_util::sync::CancellationToken`]. Cancellation is
29//! observed between blocking input reads. When observed, the proxy emits a
30//! `proxy_cancelled` event, stops reading new commands, drains remaining
31//! buffered events, and exits cleanly.
32//!
33//! # Client Disconnect
34//!
35//! If a write to the output fails (broken pipe, closed connection), the proxy
36//! stops processing, logs the error, and returns. It does not panic.
37//!
38//! # Secret Redaction
39//!
40//! When enabled (default), [`SecretRedactor`] scans event JSON for common
41//! secret patterns:
42//! - `sk-ant-*` (Anthropic API keys)
43//! - `sk-*` (OpenAI API keys)
44//! - Bearer tokens / JWTs (`eyJ*`)
45//! - JSON fields named `password`, `secret`, `token`, `api_key`, `apikey`,
46//! `private_key`, `access_token`, `refresh_token`
47//!
48//! Matching values are replaced with `[REDACTED]`. Custom patterns can be
49//! added via [`SecretRedactor::new`].
50//!
51//! # Protocol Sequence
52//!
53//! ```text
54//! → {"type":"proxy_ready","schema_version":2} // first output
55//! ← {"type":"session_info"} // command from client
56//! → {"type":"response","command":"session_info","success":true,"data":{...}}
57//! → {"type":"AgentStart"} // async event
58//! → {"type":"AgentEnd","messages":[...]} // async event
59//! ← {"type":"quit"} // client ends session
60//! → {"type":"response","command":"quit","success":true}
61//! // proxy exits
62//! ```
63
64use std::io::{BufRead, Write};
65
66use crate::sdk::{SDK_SCHEMA_VERSION, SdkCommand, SdkResponse};
67use serde_json::json;
68use tokio_util::sync::CancellationToken;
69
70// ---------------------------------------------------------------------------
71// ProxyEvent
72// ---------------------------------------------------------------------------
73
74/// An event emitted through the proxy's event channel.
75#[derive(Debug, Clone)]
76pub enum ProxyEvent {
77 /// An agent event to forward as JSONL.
78 Agent(serde_json::Value),
79}
80
81// ---------------------------------------------------------------------------
82// ProxyHandler trait
83// ---------------------------------------------------------------------------
84
85/// Handler for incoming proxy commands.
86///
87/// Implementations receive a parsed [`SdkCommand`] and return an [`SdkResponse`].
88/// They can also emit [`ProxyEvent`]s through the provided callback for async
89/// event forwarding (e.g., streaming agent events).
90pub trait ProxyHandler: Send + Sync {
91 /// Handle a single command. Use `event_sink` to emit async events.
92 fn handle_command(&self, command: SdkCommand, event_sink: &dyn Fn(ProxyEvent)) -> SdkResponse;
93}
94
95// ---------------------------------------------------------------------------
96// ProxyConfig
97// ---------------------------------------------------------------------------
98
99/// Configuration for the streaming proxy.
100#[derive(Debug, Clone)]
101pub struct ProxyConfig {
102 /// Bounded channel capacity for event buffering.
103 ///
104 /// When full, backpressure is applied (new events are dropped with a
105 /// tracing warning). Default: 256.
106 pub event_channel_capacity: usize,
107
108 /// Whether to apply secret redaction to outgoing events.
109 /// Default: true.
110 pub redact_secrets: bool,
111}
112
113impl Default for ProxyConfig {
114 fn default() -> Self {
115 Self {
116 event_channel_capacity: 256,
117 redact_secrets: true,
118 }
119 }
120}
121
122// ---------------------------------------------------------------------------
123// StreamingProxy
124// ---------------------------------------------------------------------------
125
126/// A streaming proxy that reads JSONL commands, dispatches to a handler, and
127/// writes JSONL responses/events.
128///
129/// The proxy is transport-agnostic: it works with any [`BufRead`] + [`Write`]
130/// pair (stdin/stdout, TCP streams, Unix sockets, etc.).
131pub struct StreamingProxy<H> {
132 handler: H,
133 config: ProxyConfig,
134}
135
136impl<H: ProxyHandler> StreamingProxy<H> {
137 /// Create a new proxy with the given handler and configuration.
138 pub fn new(handler: H, config: ProxyConfig) -> Self {
139 Self { handler, config }
140 }
141
142 /// Run the proxy until input is exhausted, a quit command is received,
143 /// or cancellation fires.
144 ///
145 /// Returns the writer on success, or an error if the proxy failed.
146 pub fn run<R: BufRead, W: Write>(
147 self,
148 reader: R,
149 writer: W,
150 cancel: CancellationToken,
151 ) -> Result<W, StreamingProxyError> {
152 let redactor = if self.config.redact_secrets {
153 Some(SecretRedactor::default())
154 } else {
155 None
156 };
157
158 // Event channel for event forwarding.
159 let (event_tx, event_rx) =
160 std::sync::mpsc::sync_channel::<ProxyEvent>(self.config.event_channel_capacity);
161
162 let mut engine = ProxyEngine {
163 reader,
164 writer,
165 handler: self.handler,
166 redactor,
167 event_rx,
168 event_tx,
169 cancel,
170 line_number: 0,
171 };
172
173 // Emit proxy_ready header
174 let ready = json!({
175 "type": "proxy_ready",
176 "schema_version": SDK_SCHEMA_VERSION,
177 });
178 engine.write_json(&ready)?;
179
180 engine.run_loop()
181 }
182}
183
184// ---------------------------------------------------------------------------
185// ProxyEngine (internal)
186// ---------------------------------------------------------------------------
187
188struct ProxyEngine<R, W, H> {
189 reader: R,
190 writer: W,
191 handler: H,
192 redactor: Option<SecretRedactor>,
193 event_rx: std::sync::mpsc::Receiver<ProxyEvent>,
194 event_tx: std::sync::mpsc::SyncSender<ProxyEvent>,
195 cancel: CancellationToken,
196 line_number: usize,
197}
198
199impl<R: BufRead, W: Write, H: ProxyHandler> ProxyEngine<R, W, H> {
200 fn run_loop(mut self) -> Result<W, StreamingProxyError> {
201 let mut line = String::new();
202
203 loop {
204 // Check cancellation
205 if self.cancel.is_cancelled() {
206 let _ = self.write_json(&json!({"type": "proxy_cancelled"}));
207 self.drain_events()?;
208 return Ok(self.writer);
209 }
210
211 line.clear();
212 match self.reader.read_line(&mut line) {
213 Ok(0) => {
214 // EOF
215 self.drain_events()?;
216 return Ok(self.writer);
217 }
218 Ok(_n) => {}
219 Err(e) => return Err(StreamingProxyError::Io(e.to_string())),
220 };
221
222 self.line_number += 1;
223 let trimmed = line.trim();
224
225 // Skip empty lines
226 if trimmed.is_empty() {
227 continue;
228 }
229
230 // Parse command
231 let command: SdkCommand = match serde_json::from_str(trimmed) {
232 Ok(cmd) => cmd,
233 Err(e) => {
234 let error_resp = json!({
235 "type": "proxy_error",
236 "line_number": self.line_number,
237 "error": format!("parse error: {e}"),
238 "raw": trimmed,
239 });
240 self.write_json(&error_resp)?;
241 continue;
242 }
243 };
244
245 // Dispatch to handler
246 let tx = self.event_tx.clone();
247 let sink = move |event: ProxyEvent| {
248 // Apply backpressure: if channel full, drop the event
249 if tx.try_send(event).is_err() {
250 tracing::warn!("proxy event channel full, dropping event");
251 }
252 };
253
254 let response = self.handler.handle_command(command, &sink);
255 self.write_json(
256 &serde_json::to_value(&response)
257 .unwrap_or(json!({"type":"response","success":false})),
258 )?;
259
260 // Drain any events the handler emitted
261 self.drain_events()?;
262
263 // Check for quit
264 if response.command == "quit" {
265 return Ok(self.writer);
266 }
267 }
268 }
269
270 /// Drain all buffered events and write them as JSONL.
271 fn drain_events(&mut self) -> Result<(), StreamingProxyError> {
272 while let Ok(event) = self.event_rx.try_recv() {
273 match event {
274 ProxyEvent::Agent(mut value) => {
275 if let Some(ref redactor) = self.redactor {
276 value = redactor.redact(&value);
277 }
278 self.write_json(&value)?;
279 }
280 }
281 }
282 Ok(())
283 }
284
285 /// Write a JSON value as a single JSONL line.
286 fn write_json(&mut self, value: &serde_json::Value) -> Result<(), StreamingProxyError> {
287 let mut line = serde_json::to_string(value).unwrap_or_else(|_| {
288 r#"{"type":"proxy_error","error":"serialization failed"}"#.to_owned()
289 });
290 line.push('\n');
291 self.writer
292 .write_all(line.as_bytes())
293 .map_err(|e| StreamingProxyError::Io(e.to_string()))?;
294 self.writer
295 .flush()
296 .map_err(|e| StreamingProxyError::Io(e.to_string()))?;
297 Ok(())
298 }
299}
300
301// ---------------------------------------------------------------------------
302// SecretRedactor
303// ---------------------------------------------------------------------------
304
305/// Redacts common secret patterns from JSON event payloads.
306///
307/// # Default patterns
308///
309/// The default instance matches:
310/// - API keys in values: `sk-ant-*`, `sk-*`
311/// - JWT/Bearer tokens in values: `eyJ*`
312/// - Sensitive JSON field names: `password`, `secret`, `token`, `api_key`,
313/// `apikey`, `private_key`, `access_token`, `refresh_token`
314///
315/// Custom patterns can be provided via [`SecretRedactor::new`].
316#[derive(Debug, Clone)]
317pub struct SecretRedactor {
318 /// Regex-like patterns for value matching (applied to string values).
319 value_patterns: Vec<String>,
320 /// Compiled value patterns.
321 value_regexes: Vec<regex::Regex>,
322 /// Field names whose values should always be redacted.
323 sensitive_fields: Vec<String>,
324}
325
326impl Default for SecretRedactor {
327 fn default() -> Self {
328 let value_patterns = vec![
329 // Anthropic API keys
330 r"sk-ant-[a-zA-Z0-9]{20,}".to_owned(),
331 // OpenAI-style API keys
332 r"sk-[a-zA-Z0-9]{20,}".to_owned(),
333 // JWT/Bearer tokens (eyJ header plus two token segments)
334 r"eyJ[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+".to_owned(),
335 ];
336
337 Self {
338 value_regexes: compile_value_patterns(&value_patterns),
339 value_patterns,
340 sensitive_fields: vec![
341 "password".to_owned(),
342 "secret".to_owned(),
343 "token".to_owned(),
344 "api_key".to_owned(),
345 "apikey".to_owned(),
346 "private_key".to_owned(),
347 "access_token".to_owned(),
348 "refresh_token".to_owned(),
349 ],
350 }
351 }
352}
353
354fn compile_value_patterns(patterns: &[String]) -> Vec<regex::Regex> {
355 patterns
356 .iter()
357 .filter_map(|pattern| match regex::Regex::new(pattern) {
358 Ok(regex) => Some(regex),
359 Err(err) => {
360 tracing::warn!(pattern, error = %err, "invalid secret redaction pattern ignored");
361 None
362 }
363 })
364 .collect()
365}
366
367impl SecretRedactor {
368 /// Create a redactor with custom value patterns (regex strings).
369 ///
370 /// The default sensitive field names are still included.
371 pub fn new(value_patterns: Vec<String>) -> Self {
372 Self {
373 value_regexes: compile_value_patterns(&value_patterns),
374 value_patterns,
375 sensitive_fields: Self::default().sensitive_fields,
376 }
377 }
378
379 /// Return the active value patterns.
380 pub fn patterns(&self) -> &[String] {
381 &self.value_patterns
382 }
383
384 /// Redact secrets from a JSON value, returning a cleaned copy.
385 pub fn redact(&self, value: &serde_json::Value) -> serde_json::Value {
386 self.redact_value(value)
387 }
388
389 fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
390 match value {
391 serde_json::Value::Object(map) => {
392 let mut new_map = serde_json::Map::new();
393 for (k, v) in map {
394 if self.is_sensitive_field(k) {
395 new_map.insert(
396 k.clone(),
397 serde_json::Value::String("[REDACTED]".to_owned()),
398 );
399 } else {
400 new_map.insert(k.clone(), self.redact_value(v));
401 }
402 }
403 serde_json::Value::Object(new_map)
404 }
405 serde_json::Value::Array(arr) => {
406 serde_json::Value::Array(arr.iter().map(|v| self.redact_value(v)).collect())
407 }
408 serde_json::Value::String(s) => {
409 if self.matches_value_pattern(s) {
410 serde_json::Value::String("[REDACTED]".to_owned())
411 } else {
412 serde_json::Value::String(s.clone())
413 }
414 }
415 other => other.clone(),
416 }
417 }
418
419 fn is_sensitive_field(&self, name: &str) -> bool {
420 let lower = name.to_lowercase();
421 self.sensitive_fields.iter().any(|f| f == &lower)
422 }
423
424 fn matches_value_pattern(&self, value: &str) -> bool {
425 self.value_regexes.iter().any(|regex| regex.is_match(value))
426 }
427}
428
429// ---------------------------------------------------------------------------
430// Error type
431// ---------------------------------------------------------------------------
432
433/// Errors from the streaming proxy.
434#[derive(Debug)]
435pub enum StreamingProxyError {
436 /// I/O error (read or write failure).
437 Io(String),
438}
439
440impl std::fmt::Display for StreamingProxyError {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 match self {
443 Self::Io(msg) => write!(f, "proxy I/O error: {msg}"),
444 }
445 }
446}
447
448impl std::error::Error for StreamingProxyError {}