Skip to main content

opi_agent/
streaming_proxy.rs

1//! Streaming proxy for forwarding command/event JSONL streams (task 4.10).
2//!
3//! **Unstable 0.x API** — this module may change between minor versions without
4//! notice. Consumers MUST pin an exact version and test against upgrades.
5//!
6//! # Overview
7//!
8//! The [`StreamingProxy`] reads JSONL commands from any reader, dispatches them
9//! to a [`ProxyHandler`], and writes JSONL responses/events to any writer. It
10//! bridges the SDK command model ([`SdkCommand`]/[`SdkResponse`]) with an
11//! external transport without requiring a live provider or specific I/O backend.
12//!
13//! # Framing
14//!
15//! Strict JSONL: one JSON object per line, `\n` delimiter, flushed after each
16//! write. Empty lines are silently skipped. Malformed JSON produces a
17//! `proxy_error` response with the line number and parse error.
18//!
19//! # Backpressure
20//!
21//! Events emitted by the handler are buffered in a bounded synchronous channel
22//! (configurable via [`ProxyConfig::event_channel_capacity`], default 256).
23//! When the buffer is full, new events are dropped with a tracing warning.
24//! This prevents a slow consumer from blocking the handler.
25//!
26//! # Cancellation
27//!
28//! The proxy accepts a [`tokio_util::sync::CancellationToken`]. Cancellation is
29//! observed between blocking input reads. When observed, the proxy emits a
30//! `proxy_cancelled` event, stops reading new commands, drains remaining
31//! buffered events, and exits cleanly.
32//!
33//! # Client Disconnect
34//!
35//! If a write to the output fails (broken pipe, closed connection), the proxy
36//! stops processing, logs the error, and returns. It does not panic.
37//!
38//! # Secret Redaction
39//!
40//! When enabled (default), [`SecretRedactor`] scans event JSON for common
41//! secret patterns:
42//! - `sk-ant-*` (Anthropic API keys)
43//! - `sk-*` (OpenAI API keys)
44//! - Bearer tokens / JWTs (`eyJ*`)
45//! - JSON fields named `password`, `secret`, `token`, `api_key`, `apikey`,
46//!   `private_key`, `access_token`, `refresh_token`
47//!
48//! Matching values are replaced with `[REDACTED]`. Custom patterns can be
49//! added via [`SecretRedactor::new`].
50//!
51//! # Protocol Sequence
52//!
53//! ```text
54//! → {"type":"proxy_ready","schema_version":2}     // first output
55//! ← {"type":"session_info"}                       // command from client
56//! → {"type":"response","command":"session_info","success":true,"data":{...}}
57//! → {"type":"AgentStart"}                         // async event
58//! → {"type":"AgentEnd","messages":[...]}           // async event
59//! ← {"type":"quit"}                               // client ends session
60//! → {"type":"response","command":"quit","success":true}
61//!                                                 // proxy exits
62//! ```
63
64use std::io::{BufRead, Write};
65
66use crate::sdk::{SDK_SCHEMA_VERSION, SdkCommand, SdkResponse};
67use serde_json::json;
68use tokio_util::sync::CancellationToken;
69
70// ---------------------------------------------------------------------------
71// ProxyEvent
72// ---------------------------------------------------------------------------
73
74/// An event emitted through the proxy's event channel.
75#[derive(Debug, Clone)]
76pub enum ProxyEvent {
77    /// An agent event to forward as JSONL.
78    Agent(serde_json::Value),
79}
80
81// ---------------------------------------------------------------------------
82// ProxyHandler trait
83// ---------------------------------------------------------------------------
84
85/// Handler for incoming proxy commands.
86///
87/// Implementations receive a parsed [`SdkCommand`] and return an [`SdkResponse`].
88/// They can also emit [`ProxyEvent`]s through the provided callback for async
89/// event forwarding (e.g., streaming agent events).
90pub trait ProxyHandler: Send + Sync {
91    /// Handle a single command. Use `event_sink` to emit async events.
92    fn handle_command(&self, command: SdkCommand, event_sink: &dyn Fn(ProxyEvent)) -> SdkResponse;
93}
94
95// ---------------------------------------------------------------------------
96// ProxyConfig
97// ---------------------------------------------------------------------------
98
99/// Configuration for the streaming proxy.
100#[derive(Debug, Clone)]
101pub struct ProxyConfig {
102    /// Bounded channel capacity for event buffering.
103    ///
104    /// When full, backpressure is applied (new events are dropped with a
105    /// tracing warning). Default: 256.
106    pub event_channel_capacity: usize,
107
108    /// Whether to apply secret redaction to outgoing events.
109    /// Default: true.
110    pub redact_secrets: bool,
111}
112
113impl Default for ProxyConfig {
114    fn default() -> Self {
115        Self {
116            event_channel_capacity: 256,
117            redact_secrets: true,
118        }
119    }
120}
121
122// ---------------------------------------------------------------------------
123// StreamingProxy
124// ---------------------------------------------------------------------------
125
126/// A streaming proxy that reads JSONL commands, dispatches to a handler, and
127/// writes JSONL responses/events.
128///
129/// The proxy is transport-agnostic: it works with any [`BufRead`] + [`Write`]
130/// pair (stdin/stdout, TCP streams, Unix sockets, etc.).
131pub struct StreamingProxy<H> {
132    handler: H,
133    config: ProxyConfig,
134}
135
136impl<H: ProxyHandler> StreamingProxy<H> {
137    /// Create a new proxy with the given handler and configuration.
138    pub fn new(handler: H, config: ProxyConfig) -> Self {
139        Self { handler, config }
140    }
141
142    /// Run the proxy until input is exhausted, a quit command is received,
143    /// or cancellation fires.
144    ///
145    /// Returns the writer on success, or an error if the proxy failed.
146    pub fn run<R: BufRead, W: Write>(
147        self,
148        reader: R,
149        writer: W,
150        cancel: CancellationToken,
151    ) -> Result<W, StreamingProxyError> {
152        let redactor = if self.config.redact_secrets {
153            Some(SecretRedactor::default())
154        } else {
155            None
156        };
157
158        // Event channel for event forwarding.
159        let (event_tx, event_rx) =
160            std::sync::mpsc::sync_channel::<ProxyEvent>(self.config.event_channel_capacity);
161
162        let mut engine = ProxyEngine {
163            reader,
164            writer,
165            handler: self.handler,
166            redactor,
167            event_rx,
168            event_tx,
169            cancel,
170            line_number: 0,
171        };
172
173        // Emit proxy_ready header
174        let ready = json!({
175            "type": "proxy_ready",
176            "schema_version": SDK_SCHEMA_VERSION,
177        });
178        engine.write_json(&ready)?;
179
180        engine.run_loop()
181    }
182}
183
184// ---------------------------------------------------------------------------
185// ProxyEngine (internal)
186// ---------------------------------------------------------------------------
187
188struct ProxyEngine<R, W, H> {
189    reader: R,
190    writer: W,
191    handler: H,
192    redactor: Option<SecretRedactor>,
193    event_rx: std::sync::mpsc::Receiver<ProxyEvent>,
194    event_tx: std::sync::mpsc::SyncSender<ProxyEvent>,
195    cancel: CancellationToken,
196    line_number: usize,
197}
198
199impl<R: BufRead, W: Write, H: ProxyHandler> ProxyEngine<R, W, H> {
200    fn run_loop(mut self) -> Result<W, StreamingProxyError> {
201        let mut line = String::new();
202
203        loop {
204            // Check cancellation
205            if self.cancel.is_cancelled() {
206                let _ = self.write_json(&json!({"type": "proxy_cancelled"}));
207                self.drain_events()?;
208                return Ok(self.writer);
209            }
210
211            line.clear();
212            match self.reader.read_line(&mut line) {
213                Ok(0) => {
214                    // EOF
215                    self.drain_events()?;
216                    return Ok(self.writer);
217                }
218                Ok(_n) => {}
219                Err(e) => return Err(StreamingProxyError::Io(e.to_string())),
220            };
221
222            self.line_number += 1;
223            let trimmed = line.trim();
224
225            // Skip empty lines
226            if trimmed.is_empty() {
227                continue;
228            }
229
230            // Parse command
231            let command: SdkCommand = match serde_json::from_str(trimmed) {
232                Ok(cmd) => cmd,
233                Err(e) => {
234                    let error_resp = json!({
235                        "type": "proxy_error",
236                        "line_number": self.line_number,
237                        "error": format!("parse error: {e}"),
238                        "raw": trimmed,
239                    });
240                    self.write_json(&error_resp)?;
241                    continue;
242                }
243            };
244
245            // Dispatch to handler
246            let tx = self.event_tx.clone();
247            let sink = move |event: ProxyEvent| {
248                // Apply backpressure: if channel full, drop the event
249                if tx.try_send(event).is_err() {
250                    tracing::warn!("proxy event channel full, dropping event");
251                }
252            };
253
254            let response = self.handler.handle_command(command, &sink);
255            self.write_json(
256                &serde_json::to_value(&response)
257                    .unwrap_or(json!({"type":"response","success":false})),
258            )?;
259
260            // Drain any events the handler emitted
261            self.drain_events()?;
262
263            // Check for quit
264            if response.command == "quit" {
265                return Ok(self.writer);
266            }
267        }
268    }
269
270    /// Drain all buffered events and write them as JSONL.
271    fn drain_events(&mut self) -> Result<(), StreamingProxyError> {
272        while let Ok(event) = self.event_rx.try_recv() {
273            match event {
274                ProxyEvent::Agent(mut value) => {
275                    if let Some(ref redactor) = self.redactor {
276                        value = redactor.redact(&value);
277                    }
278                    self.write_json(&value)?;
279                }
280            }
281        }
282        Ok(())
283    }
284
285    /// Write a JSON value as a single JSONL line.
286    fn write_json(&mut self, value: &serde_json::Value) -> Result<(), StreamingProxyError> {
287        let mut line = serde_json::to_string(value).unwrap_or_else(|_| {
288            r#"{"type":"proxy_error","error":"serialization failed"}"#.to_owned()
289        });
290        line.push('\n');
291        self.writer
292            .write_all(line.as_bytes())
293            .map_err(|e| StreamingProxyError::Io(e.to_string()))?;
294        self.writer
295            .flush()
296            .map_err(|e| StreamingProxyError::Io(e.to_string()))?;
297        Ok(())
298    }
299}
300
301// ---------------------------------------------------------------------------
302// SecretRedactor
303// ---------------------------------------------------------------------------
304
305/// Redacts common secret patterns from JSON event payloads.
306///
307/// # Default patterns
308///
309/// The default instance matches:
310/// - API keys in values: `sk-ant-*`, `sk-*`
311/// - JWT/Bearer tokens in values: `eyJ*`
312/// - Sensitive JSON field names: `password`, `secret`, `token`, `api_key`,
313///   `apikey`, `private_key`, `access_token`, `refresh_token`
314///
315/// Custom patterns can be provided via [`SecretRedactor::new`].
316#[derive(Debug, Clone)]
317pub struct SecretRedactor {
318    /// Regex-like patterns for value matching (applied to string values).
319    value_patterns: Vec<String>,
320    /// Compiled value patterns.
321    value_regexes: Vec<regex::Regex>,
322    /// Field names whose values should always be redacted.
323    sensitive_fields: Vec<String>,
324}
325
326impl Default for SecretRedactor {
327    fn default() -> Self {
328        let value_patterns = vec![
329            // Anthropic API keys
330            r"sk-ant-[a-zA-Z0-9]{20,}".to_owned(),
331            // OpenAI-style API keys
332            r"sk-[a-zA-Z0-9]{20,}".to_owned(),
333            // JWT/Bearer tokens (eyJ header plus two token segments)
334            r"eyJ[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+".to_owned(),
335        ];
336
337        Self {
338            value_regexes: compile_value_patterns(&value_patterns),
339            value_patterns,
340            sensitive_fields: vec![
341                "password".to_owned(),
342                "secret".to_owned(),
343                "token".to_owned(),
344                "api_key".to_owned(),
345                "apikey".to_owned(),
346                "private_key".to_owned(),
347                "access_token".to_owned(),
348                "refresh_token".to_owned(),
349            ],
350        }
351    }
352}
353
354fn compile_value_patterns(patterns: &[String]) -> Vec<regex::Regex> {
355    patterns
356        .iter()
357        .filter_map(|pattern| match regex::Regex::new(pattern) {
358            Ok(regex) => Some(regex),
359            Err(err) => {
360                tracing::warn!(pattern, error = %err, "invalid secret redaction pattern ignored");
361                None
362            }
363        })
364        .collect()
365}
366
367impl SecretRedactor {
368    /// Create a redactor with custom value patterns (regex strings).
369    ///
370    /// The default sensitive field names are still included.
371    pub fn new(value_patterns: Vec<String>) -> Self {
372        Self {
373            value_regexes: compile_value_patterns(&value_patterns),
374            value_patterns,
375            sensitive_fields: Self::default().sensitive_fields,
376        }
377    }
378
379    /// Return the active value patterns.
380    pub fn patterns(&self) -> &[String] {
381        &self.value_patterns
382    }
383
384    /// Redact secrets from a JSON value, returning a cleaned copy.
385    pub fn redact(&self, value: &serde_json::Value) -> serde_json::Value {
386        self.redact_value(value)
387    }
388
389    fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
390        match value {
391            serde_json::Value::Object(map) => {
392                let mut new_map = serde_json::Map::new();
393                for (k, v) in map {
394                    if self.is_sensitive_field(k) {
395                        new_map.insert(
396                            k.clone(),
397                            serde_json::Value::String("[REDACTED]".to_owned()),
398                        );
399                    } else {
400                        new_map.insert(k.clone(), self.redact_value(v));
401                    }
402                }
403                serde_json::Value::Object(new_map)
404            }
405            serde_json::Value::Array(arr) => {
406                serde_json::Value::Array(arr.iter().map(|v| self.redact_value(v)).collect())
407            }
408            serde_json::Value::String(s) => {
409                if self.matches_value_pattern(s) {
410                    serde_json::Value::String("[REDACTED]".to_owned())
411                } else {
412                    serde_json::Value::String(s.clone())
413                }
414            }
415            other => other.clone(),
416        }
417    }
418
419    fn is_sensitive_field(&self, name: &str) -> bool {
420        let lower = name.to_lowercase();
421        self.sensitive_fields.iter().any(|f| f == &lower)
422    }
423
424    fn matches_value_pattern(&self, value: &str) -> bool {
425        self.value_regexes.iter().any(|regex| regex.is_match(value))
426    }
427}
428
429// ---------------------------------------------------------------------------
430// Error type
431// ---------------------------------------------------------------------------
432
433/// Errors from the streaming proxy.
434#[derive(Debug)]
435pub enum StreamingProxyError {
436    /// I/O error (read or write failure).
437    Io(String),
438}
439
440impl std::fmt::Display for StreamingProxyError {
441    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442        match self {
443            Self::Io(msg) => write!(f, "proxy I/O error: {msg}"),
444        }
445    }
446}
447
448impl std::error::Error for StreamingProxyError {}