Skip to main content

agnt_net/
backend.rs

1use agnt_core::{BackendError, FunctionCall, LlmBackend, Message, ToolCall};
2use serde_json::{json, Value};
3use std::io::{BufRead, BufReader, Read};
4use std::sync::Arc;
5use std::thread;
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum Kind {
10    Openai,
11    Anthropic,
12}
13
14/// A multi-provider LLM backend.
15///
16/// Supports Ollama (via OpenAI-compatible API), OpenAI, and Anthropic.
17/// All three providers use the same internal `Message`/`ToolCall` format —
18/// Anthropic's content-block schema is translated at the wire boundary.
19///
20/// # Example
21///
22/// ```no_run
23/// use agnt_net::Backend;
24///
25/// let ollama = Backend::ollama("gemma4:e4b");
26/// let openai = Backend::openai("gpt-4o-mini", "sk-...");
27/// let anthropic = Backend::anthropic("claude-sonnet-4-6", "sk-ant-...");
28/// ```
29#[derive(Clone)]
30pub struct Backend {
31    /// Which provider schema to use on the wire.
32    pub kind: Kind,
33    /// Base URL for the provider's API.
34    pub base_url: String,
35    /// Optional API key. `None` for local Ollama.
36    api_key: Option<String>,
37    /// Model identifier passed in every request.
38    pub model: String,
39    /// Optional dedicated ureq Agent. When `None`, the process-wide shared
40    /// Agent (with default timeouts) is used.
41    agent: Option<Arc<ureq::Agent>>,
42}
43
44impl std::fmt::Debug for Backend {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        let kind = match self.kind {
47            Kind::Openai => "Openai",
48            Kind::Anthropic => "Anthropic",
49        };
50        f.debug_struct("Backend")
51            .field("kind", &kind)
52            .field("base_url", &self.base_url)
53            .field("api_key", &"<redacted>")
54            .field("model", &self.model)
55            .field("agent", &self.agent.as_ref().map(|_| "<custom>"))
56            .finish()
57    }
58}
59
60impl Backend {
61    /// Create a backend pointing at a local Ollama server.
62    ///
63    /// Uses `http://localhost:11434/v1` by default (the OpenAI-compatible endpoint).
64    pub fn ollama(model: &str) -> Self {
65        Self {
66            kind: Kind::Openai,
67            base_url: "http://localhost:11434/v1".into(),
68            api_key: None,
69            model: model.into(),
70            agent: None,
71        }
72    }
73    /// Create a backend for OpenAI's API.
74    pub fn openai(model: &str, api_key: &str) -> Self {
75        Self {
76            kind: Kind::Openai,
77            base_url: "https://api.openai.com/v1".into(),
78            api_key: Some(api_key.into()),
79            model: model.into(),
80            agent: None,
81        }
82    }
83    /// Create a backend for Anthropic's native API.
84    ///
85    /// Message format is automatically translated to Anthropic's content-block
86    /// schema at the wire boundary — you still work with the OpenAI-style
87    /// [`Message`] type internally.
88    pub fn anthropic(model: &str, api_key: &str) -> Self {
89        Self {
90            kind: Kind::Anthropic,
91            base_url: "https://api.anthropic.com/v1".into(),
92            api_key: Some(api_key.into()),
93            model: model.into(),
94            agent: None,
95        }
96    }
97
98    /// Override the HTTP timeouts for this backend instance.
99    ///
100    /// Builds a fresh ureq Agent with the supplied connect/read timeouts and
101    /// attaches it to this [`Backend`]. Subsequent requests made via this
102    /// instance will use the custom Agent instead of the process-wide shared
103    /// one.
104    ///
105    /// Returns an error if TLS initialization fails.
106    pub fn with_timeouts(mut self, connect: Duration, read: Duration) -> Result<Self, String> {
107        let agent = crate::http::build_agent(connect, read)?;
108        self.agent = Some(Arc::new(agent));
109        Ok(self)
110    }
111
112    #[tracing::instrument(
113        skip(self, messages, tools, sink),
114        fields(
115            kind = ?self.kind,
116            model = %self.model,
117            message_count = messages.len(),
118            streaming = sink.is_some(),
119        )
120    )]
121    pub fn chat(
122        &self,
123        messages: &[Message],
124        tools: &Value,
125        sink: Option<&mut dyn FnMut(&str)>,
126    ) -> Result<Message, String> {
127        match self.kind {
128            Kind::Openai => self.chat_openai(messages, tools, sink),
129            Kind::Anthropic => self.chat_anthropic(messages, tools, sink),
130        }
131    }
132
133    fn build_request(&self, url: &str) -> Result<ureq::Request, String> {
134        let agent: &ureq::Agent = match &self.agent {
135            Some(a) => a.as_ref(),
136            None => crate::http::agent()?,
137        };
138        let mut req = agent.post(url).set("Content-Type", "application/json");
139        if let Some(k) = &self.api_key {
140            match self.kind {
141                Kind::Openai => {
142                    req = req.set("Authorization", &format!("Bearer {}", k));
143                }
144                Kind::Anthropic => {
145                    req = req
146                        .set("x-api-key", k)
147                        .set("anthropic-version", "2023-06-01");
148                }
149            }
150        }
151        Ok(req)
152    }
153
154    fn chat_openai(
155        &self,
156        messages: &[Message],
157        tools: &Value,
158        sink: Option<&mut dyn FnMut(&str)>,
159    ) -> Result<Message, String> {
160        let url = format!("{}/chat/completions", self.base_url);
161        let mut body = json!({ "model": self.model, "messages": messages });
162        if let Some(arr) = tools.as_array() {
163            if !arr.is_empty() {
164                body["tools"] = tools.clone();
165            }
166        }
167        if sink.is_some() {
168            body["stream"] = Value::Bool(true);
169        }
170
171        // Serialize the body exactly once before entering the retry loop so
172        // we don't clone a fresh JSON `Value` on every attempt.
173        let body_bytes =
174            serde_json::to_vec(&body).map_err(|e| format!("encode body: {}", e))?;
175        let body_slice: &[u8] = &body_bytes;
176
177        let resp = with_retry(5, || {
178            self.build_request(&url)?
179                .send_bytes(body_slice)
180                .map_err(RetryError::from)
181        })?;
182
183        if let Some(sink) = sink {
184            parse_openai_stream(resp.into_reader(), sink)
185        } else {
186            let v: Value = resp.into_json().map_err(|e| format!("decode: {}", e))?;
187            let msg = v
188                .get("choices")
189                .and_then(|c| c.get(0))
190                .and_then(|c| c.get("message"))
191                .ok_or_else(|| format!("no message: {}", v))?;
192            serde_json::from_value(msg.clone()).map_err(|e| format!("parse: {}", e))
193        }
194    }
195
196    fn chat_anthropic(
197        &self,
198        messages: &[Message],
199        tools: &Value,
200        sink: Option<&mut dyn FnMut(&str)>,
201    ) -> Result<Message, String> {
202        let url = format!("{}/messages", self.base_url);
203        let (system, msgs) = to_anthropic_messages(messages);
204        let mut body = json!({
205            "model": self.model,
206            "messages": msgs,
207            "max_tokens": 4096,
208        });
209        if !system.is_empty() {
210            body["system"] = Value::String(system);
211        }
212        if let Some(arr) = tools.as_array() {
213            if !arr.is_empty() {
214                let conv: Vec<Value> = arr
215                    .iter()
216                    .map(|t| {
217                        let f = &t["function"];
218                        json!({
219                            "name": f["name"],
220                            "description": f["description"],
221                            "input_schema": f["parameters"],
222                        })
223                    })
224                    .collect();
225                body["tools"] = Value::Array(conv);
226            }
227        }
228        if sink.is_some() {
229            body["stream"] = Value::Bool(true);
230        }
231
232        let body_bytes =
233            serde_json::to_vec(&body).map_err(|e| format!("encode body: {}", e))?;
234        let body_slice: &[u8] = &body_bytes;
235
236        let resp = with_retry(5, || {
237            self.build_request(&url)?
238                .send_bytes(body_slice)
239                .map_err(RetryError::from)
240        })?;
241
242        if let Some(sink) = sink {
243            parse_anthropic_stream(resp.into_reader(), sink)
244        } else {
245            let v: Value = resp.into_json().map_err(|e| format!("decode: {}", e))?;
246            from_anthropic_response(&v)
247        }
248    }
249}
250
251/// Internal error type for the retry loop — distinguishes a failure to build
252/// the request (e.g. TLS init) from a ureq transport/status error.
253enum RetryError {
254    Build(String),
255    Ureq(ureq::Error),
256}
257
258impl From<ureq::Error> for RetryError {
259    fn from(e: ureq::Error) -> Self {
260        RetryError::Ureq(e)
261    }
262}
263
264impl From<String> for RetryError {
265    fn from(e: String) -> Self {
266        RetryError::Build(e)
267    }
268}
269
270/// Strip sensitive headers from an error body before it bubbles up.
271///
272/// Upstream providers sometimes echo request headers back in verbose error
273/// payloads. We redact the two we know carry secrets so they never end up in
274/// logs.
275fn redact_secrets(s: &str) -> String {
276    // Line-based replacement is good enough for SSE-style payloads; we also
277    // handle single-line JSON with inline header strings.
278    let mut out = String::with_capacity(s.len());
279    for line in s.split_inclusive('\n') {
280        let lower = line.to_ascii_lowercase();
281        if lower.contains("authorization") || lower.contains("x-api-key") {
282            // Drop bearer token / api key values after the header name.
283            // A conservative redaction: replace the whole line.
284            out.push_str("[redacted header]\n");
285        } else {
286            out.push_str(line);
287        }
288    }
289    out
290}
291
292/// Simple xorshift64* PRNG seeded from the wall clock. We only use this for
293/// retry jitter so quality is not critical — we just want each process (and
294/// ideally each retry) to pick a different multiplier.
295fn xorshift_jitter(state: &mut u64) -> f64 {
296    if *state == 0 {
297        *state = SystemTime::now()
298            .duration_since(UNIX_EPOCH)
299            .map(|d| d.as_nanos() as u64)
300            .unwrap_or(0x9E3779B97F4A7C15)
301            .wrapping_add(1);
302    }
303    let mut x = *state;
304    x ^= x << 13;
305    x ^= x >> 7;
306    x ^= x << 17;
307    *state = x;
308    // Map into [-1.0, 1.0).
309    let frac = ((x >> 11) as f64) / ((1u64 << 53) as f64);
310    frac * 2.0 - 1.0
311}
312
313/// Apply ±20% jitter to a base delay in milliseconds.
314fn jittered(base_ms: u64, rng_state: &mut u64) -> u64 {
315    let j = xorshift_jitter(rng_state); // [-1, 1)
316    let delta = (base_ms as f64) * 0.20 * j;
317    let adjusted = (base_ms as f64 + delta).max(0.0);
318    adjusted as u64
319}
320
321fn with_retry<F>(max: u32, mut f: F) -> Result<ureq::Response, String>
322where
323    F: FnMut() -> Result<ureq::Response, RetryError>,
324{
325    if max == 0 {
326        return Err("with_retry: max must be >= 1".into());
327    }
328    let mut base_delay = 500u64;
329    let mut rng_state = 0u64;
330    let mut last_err: Option<String> = None;
331    for i in 0..max {
332        match f() {
333            Ok(r) => return Ok(r),
334            Err(RetryError::Build(e)) => {
335                // A build failure (e.g. TLS init) is not worth retrying.
336                return Err(e);
337            }
338            Err(RetryError::Ureq(ureq::Error::Status(code, r))) => {
339                let retryable = code == 429 || code >= 500;
340                if retryable && i + 1 < max {
341                    let sleep_ms = jittered(base_delay, &mut rng_state);
342                    thread::sleep(Duration::from_millis(sleep_ms));
343                    base_delay = (base_delay * 2).min(8000);
344                    continue;
345                }
346                let body = r.into_string().unwrap_or_default();
347                return Err(redact_secrets(&format!("http {}: {}", code, body)));
348            }
349            Err(RetryError::Ureq(ureq::Error::Transport(t))) => {
350                last_err = Some(format!("transport: {}", t));
351                if i + 1 < max {
352                    let sleep_ms = jittered(base_delay, &mut rng_state);
353                    thread::sleep(Duration::from_millis(sleep_ms));
354                    base_delay = (base_delay * 2).min(8000);
355                    continue;
356                }
357                return Err(redact_secrets(last_err.as_deref().unwrap_or("transport: unknown")));
358            }
359        }
360    }
361    Err(redact_secrets(
362        last_err.as_deref().unwrap_or("with_retry: exhausted"),
363    ))
364}
365
366fn to_anthropic_messages(msgs: &[Message]) -> (String, Vec<Value>) {
367    let mut system = String::new();
368    let mut out: Vec<Value> = Vec::new();
369    for m in msgs {
370        match m.role.as_str() {
371            "system" => {
372                if !system.is_empty() {
373                    system.push('\n');
374                }
375                if let Some(c) = &m.content {
376                    system.push_str(c);
377                }
378            }
379            "user" => {
380                out.push(json!({
381                    "role": "user",
382                    "content": m.content.clone().unwrap_or_default()
383                }));
384            }
385            "assistant" => {
386                let mut blocks: Vec<Value> = Vec::new();
387                if let Some(c) = &m.content {
388                    if !c.is_empty() {
389                        blocks.push(json!({"type":"text","text": c}));
390                    }
391                }
392                if let Some(tcs) = &m.tool_calls {
393                    for tc in tcs {
394                        let input: Value =
395                            serde_json::from_str(&tc.function.arguments).unwrap_or(json!({}));
396                        blocks.push(json!({
397                            "type": "tool_use",
398                            "id": tc.id,
399                            "name": tc.function.name,
400                            "input": input,
401                        }));
402                    }
403                }
404                out.push(json!({ "role": "assistant", "content": blocks }));
405            }
406            "tool" => {
407                let block = json!({
408                    "type": "tool_result",
409                    "tool_use_id": m.tool_call_id.clone().unwrap_or_default(),
410                    "content": m.content.clone().unwrap_or_default(),
411                });
412                if let Some(last) = out.last_mut() {
413                    if last["role"] == "user" {
414                        if last["content"].is_array() {
415                            last["content"].as_array_mut().unwrap().push(block);
416                            continue;
417                        } else {
418                            let existing = last["content"].clone();
419                            let mut arr: Vec<Value> = Vec::new();
420                            if existing.is_string()
421                                && !existing.as_str().unwrap_or("").is_empty()
422                            {
423                                arr.push(json!({"type":"text","text": existing}));
424                            }
425                            arr.push(block);
426                            last["content"] = Value::Array(arr);
427                            continue;
428                        }
429                    }
430                }
431                out.push(json!({ "role": "user", "content": [block] }));
432            }
433            _ => {}
434        }
435    }
436    (system, out)
437}
438
439fn from_anthropic_response(v: &Value) -> Result<Message, String> {
440    let content = v
441        .get("content")
442        .and_then(|c| c.as_array())
443        .ok_or_else(|| format!("no content: {}", v))?;
444    let mut text = String::new();
445    let mut tool_calls: Vec<ToolCall> = Vec::new();
446    for block in content {
447        match block.get("type").and_then(|t| t.as_str()) {
448            Some("text") => {
449                if let Some(t) = block.get("text").and_then(|t| t.as_str()) {
450                    text.push_str(t);
451                }
452            }
453            Some("tool_use") => {
454                let id = block
455                    .get("id")
456                    .and_then(|i| i.as_str())
457                    .unwrap_or("")
458                    .to_string();
459                let name = block
460                    .get("name")
461                    .and_then(|i| i.as_str())
462                    .unwrap_or("")
463                    .to_string();
464                let input = block.get("input").cloned().unwrap_or(Value::Null);
465                tool_calls.push(ToolCall {
466                    id,
467                    call_type: "function".into(),
468                    function: FunctionCall {
469                        name,
470                        arguments: input.to_string(),
471                    },
472                });
473            }
474            _ => {}
475        }
476    }
477    Ok(Message {
478        role: "assistant".into(),
479        content: if text.is_empty() { None } else { Some(text) },
480        tool_calls: if tool_calls.is_empty() {
481            None
482        } else {
483            Some(tool_calls)
484        },
485        tool_call_id: None,
486        name: None,
487    })
488}
489
490/// Read one line from the reader into `buf` (cleared first), stripping a
491/// trailing `\n` and optional `\r`. Returns `Ok(false)` at EOF.
492fn read_sse_line<R: BufRead>(reader: &mut R, buf: &mut String) -> std::io::Result<bool> {
493    buf.clear();
494    let n = reader.read_line(buf)?;
495    if n == 0 {
496        return Ok(false);
497    }
498    if buf.ends_with('\n') {
499        buf.pop();
500        if buf.ends_with('\r') {
501            buf.pop();
502        }
503    }
504    Ok(true)
505}
506
507/// Fuzzing / integration-test hook for the OpenAI SSE parser.
508///
509/// Not part of the stable API — `#[doc(hidden)]` and behind the
510/// `fuzz-api` feature so libfuzzer targets can exercise the stream
511/// parser without the parser itself becoming `pub`. See
512/// `fuzz/fuzz_targets/fuzz_openai_sse.rs`.
513#[doc(hidden)]
514#[cfg(feature = "fuzz-api")]
515pub fn _fuzz_parse_openai_stream(bytes: &[u8]) -> Result<Message, String> {
516    let mut sink = |_s: &str| {};
517    parse_openai_stream(bytes, &mut sink)
518}
519
520/// Fuzzing hook for the Anthropic SSE parser. See
521/// [`_fuzz_parse_openai_stream`].
522#[doc(hidden)]
523#[cfg(feature = "fuzz-api")]
524pub fn _fuzz_parse_anthropic_stream(bytes: &[u8]) -> Result<Message, String> {
525    let mut sink = |_s: &str| {};
526    parse_anthropic_stream(bytes, &mut sink)
527}
528
529fn parse_openai_stream<R: Read>(
530    resp: R,
531    sink: &mut dyn FnMut(&str),
532) -> Result<Message, String> {
533    // Generic over `R: Read` so tests can feed a `&[u8]` and production can
534    // pass `ureq::Response::into_reader()`.
535    let mut reader = BufReader::new(resp);
536    let mut text = String::new();
537    let mut tool_calls: Vec<ToolCall> = Vec::new();
538    let mut line = String::new();
539
540    while read_sse_line(&mut reader, &mut line).map_err(|e| format!("stream: {}", e))? {
541        let data = match line.strip_prefix("data: ") {
542            Some(d) => d,
543            None => continue,
544        };
545        if data == "[DONE]" {
546            break;
547        }
548        let chunk: Value = match serde_json::from_str(data) {
549            Ok(v) => v,
550            Err(_) => continue,
551        };
552        let delta = match chunk
553            .get("choices")
554            .and_then(|c| c.get(0))
555            .and_then(|c| c.get("delta"))
556        {
557            Some(d) => d,
558            None => continue,
559        };
560        if let Some(c) = delta.get("content").and_then(|c| c.as_str()) {
561            text.push_str(c);
562            sink(c);
563        }
564        if let Some(tcs) = delta.get("tool_calls").and_then(|t| t.as_array()) {
565            for tc in tcs {
566                let idx = tc.get("index").and_then(|i| i.as_u64()).unwrap_or(0) as usize;
567                while tool_calls.len() <= idx {
568                    tool_calls.push(ToolCall {
569                        id: String::new(),
570                        call_type: "function".into(),
571                        function: FunctionCall {
572                            name: String::new(),
573                            arguments: String::new(),
574                        },
575                    });
576                }
577                let slot = &mut tool_calls[idx];
578                if let Some(id) = tc.get("id").and_then(|i| i.as_str()) {
579                    if !id.is_empty() {
580                        slot.id = id.to_string();
581                    }
582                }
583                if let Some(f) = tc.get("function") {
584                    if let Some(n) = f.get("name").and_then(|n| n.as_str()) {
585                        slot.function.name.push_str(n);
586                    }
587                    if let Some(a) = f.get("arguments").and_then(|a| a.as_str()) {
588                        slot.function.arguments.push_str(a);
589                    }
590                }
591            }
592        }
593    }
594
595    Ok(Message {
596        role: "assistant".into(),
597        content: if text.is_empty() { None } else { Some(text) },
598        tool_calls: if tool_calls.is_empty() {
599            None
600        } else {
601            Some(tool_calls)
602        },
603        tool_call_id: None,
604        name: None,
605    })
606}
607
608fn parse_anthropic_stream<R: Read>(
609    resp: R,
610    sink: &mut dyn FnMut(&str),
611) -> Result<Message, String> {
612    let mut reader = BufReader::new(resp);
613    let mut text = String::new();
614    let mut blocks: Vec<(String, String, String, String)> = Vec::new();
615    let mut line = String::new();
616
617    while read_sse_line(&mut reader, &mut line).map_err(|e| format!("stream: {}", e))? {
618        let data = match line.strip_prefix("data: ") {
619            Some(d) => d,
620            None => continue,
621        };
622        let ev: Value = match serde_json::from_str(data) {
623            Ok(v) => v,
624            Err(_) => continue,
625        };
626        let t = ev.get("type").and_then(|t| t.as_str()).unwrap_or("");
627        match t {
628            "content_block_start" => {
629                let idx = ev.get("index").and_then(|i| i.as_u64()).unwrap_or(0) as usize;
630                // Borrow the content_block in place instead of cloning it.
631                let block = ev.get("content_block");
632                let btype = block
633                    .and_then(|b| b.get("type"))
634                    .and_then(|t| t.as_str())
635                    .unwrap_or("")
636                    .to_string();
637                let id = block
638                    .and_then(|b| b.get("id"))
639                    .and_then(|i| i.as_str())
640                    .unwrap_or("")
641                    .to_string();
642                let name = block
643                    .and_then(|b| b.get("name"))
644                    .and_then(|i| i.as_str())
645                    .unwrap_or("")
646                    .to_string();
647                while blocks.len() <= idx {
648                    blocks.push((String::new(), String::new(), String::new(), String::new()));
649                }
650                blocks[idx] = (btype, id, name, String::new());
651            }
652            "content_block_delta" => {
653                let idx = ev.get("index").and_then(|i| i.as_u64()).unwrap_or(0) as usize;
654                // Borrow the delta in place instead of cloning it.
655                let delta = ev.get("delta");
656                let dtype = delta
657                    .and_then(|d| d.get("type"))
658                    .and_then(|t| t.as_str())
659                    .unwrap_or("");
660                match dtype {
661                    "text_delta" => {
662                        if let Some(t) =
663                            delta.and_then(|d| d.get("text")).and_then(|t| t.as_str())
664                        {
665                            text.push_str(t);
666                            sink(t);
667                        }
668                    }
669                    "input_json_delta" => {
670                        if let Some(p) = delta
671                            .and_then(|d| d.get("partial_json"))
672                            .and_then(|p| p.as_str())
673                        {
674                            if let Some(slot) = blocks.get_mut(idx) {
675                                slot.3.push_str(p);
676                            }
677                        }
678                    }
679                    _ => {}
680                }
681            }
682            "message_stop" => break,
683            _ => {}
684        }
685    }
686
687    let mut tool_calls: Vec<ToolCall> = Vec::new();
688    for (btype, id, name, args) in blocks {
689        if btype == "tool_use" {
690            tool_calls.push(ToolCall {
691                id,
692                call_type: "function".into(),
693                function: FunctionCall {
694                    name,
695                    arguments: if args.is_empty() {
696                        "{}".into()
697                    } else {
698                        args
699                    },
700                },
701            });
702        }
703    }
704
705    Ok(Message {
706        role: "assistant".into(),
707        content: if text.is_empty() { None } else { Some(text) },
708        tool_calls: if tool_calls.is_empty() {
709            None
710        } else {
711            Some(tool_calls)
712        },
713        tool_call_id: None,
714        name: None,
715    })
716}
717
718impl LlmBackend for Backend {
719    fn model(&self) -> &str {
720        &self.model
721    }
722
723    fn chat(
724        &self,
725        messages: &[Message],
726        tools: &Value,
727        on_token: Option<&mut dyn FnMut(&str)>,
728    ) -> Result<Message, BackendError> {
729        // Delegate to the inherent method and map String errors into
730        // BackendError::Provider. Leg 2 refinements (error taxonomy at
731        // source) land in v0.2 Phase 1.
732        Backend::chat(self, messages, tools, on_token).map_err(BackendError::Provider)
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739
740    #[test]
741    fn debug_impl_redacts_api_key() {
742        let b = Backend::openai("gpt-4o-mini", "sk-super-secret-key");
743        let s = format!("{:?}", b);
744        assert!(s.contains("<redacted>"), "debug output: {}", s);
745        assert!(
746            !s.contains("sk-super-secret-key"),
747            "secret leaked in debug output: {}",
748            s
749        );
750    }
751
752    #[test]
753    fn redact_secrets_strips_auth_headers() {
754        let raw = "line1\nAuthorization: Bearer sk-xyz\nx-api-key: abc\nother\n";
755        let out = redact_secrets(raw);
756        assert!(!out.contains("sk-xyz"));
757        assert!(!out.contains("abc"));
758        assert!(out.contains("line1"));
759        assert!(out.contains("other"));
760    }
761
762    #[test]
763    fn with_retry_zero_max_returns_err_not_panic() {
764        let r: Result<ureq::Response, String> =
765            with_retry(0, || unreachable!("should not be called"));
766        assert!(r.is_err());
767        assert!(r.unwrap_err().contains("max must be >= 1"));
768    }
769
770    #[test]
771    fn with_retry_build_error_is_not_retried() {
772        let mut calls = 0u32;
773        let r: Result<ureq::Response, String> = with_retry(5, || {
774            calls += 1;
775            Err(RetryError::Build("tls init blew up".into()))
776        });
777        assert!(r.is_err());
778        assert_eq!(calls, 1, "build errors must not be retried");
779    }
780
781    #[test]
782    fn jitter_stays_within_bounds() {
783        let mut state = 1u64;
784        for _ in 0..1000 {
785            let j = jittered(1000, &mut state);
786            assert!(j <= 1200, "j={}", j);
787            // Lower bound: 1000 - 200 = 800, but floor is 0.
788            assert!(j >= 800, "j={}", j);
789        }
790    }
791
792    #[test]
793    fn openai_stream_parses_content_and_tool_call() {
794        let data = concat!(
795            "data: {\"choices\":[{\"delta\":{\"content\":\"hel\"}}]}\n",
796            "data: {\"choices\":[{\"delta\":{\"content\":\"lo\"}}]}\n",
797            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"function\":{\"name\":\"f\",\"arguments\":\"{\\\"x\\\":1}\"}}]}}]}\n",
798            "data: [DONE]\n",
799        );
800        let mut captured = String::new();
801        let msg = {
802            let mut sink = |s: &str| captured.push_str(s);
803            parse_openai_stream(data.as_bytes(), &mut sink).unwrap()
804        };
805        assert_eq!(captured, "hello");
806        assert_eq!(msg.content.as_deref(), Some("hello"));
807        let tcs = msg.tool_calls.expect("tool_calls");
808        assert_eq!(tcs.len(), 1);
809        assert_eq!(tcs[0].id, "call_1");
810        assert_eq!(tcs[0].function.name, "f");
811        assert_eq!(tcs[0].function.arguments, "{\"x\":1}");
812    }
813
814    #[test]
815    fn anthropic_stream_parses_text_and_tool_use() {
816        let data = concat!(
817            "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\"}}\n",
818            "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n",
819            "data: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"tool_use\",\"id\":\"t1\",\"name\":\"lookup\"}}\n",
820            "data: {\"type\":\"content_block_delta\",\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"q\\\":\\\"x\\\"}\"}}\n",
821            "data: {\"type\":\"message_stop\"}\n",
822        );
823        let mut captured = String::new();
824        let msg = {
825            let mut sink = |s: &str| captured.push_str(s);
826            parse_anthropic_stream(data.as_bytes(), &mut sink).unwrap()
827        };
828        assert_eq!(captured, "hi");
829        assert_eq!(msg.content.as_deref(), Some("hi"));
830        let tcs = msg.tool_calls.expect("tool_calls");
831        assert_eq!(tcs.len(), 1);
832        assert_eq!(tcs[0].id, "t1");
833        assert_eq!(tcs[0].function.name, "lookup");
834        assert_eq!(tcs[0].function.arguments, "{\"q\":\"x\"}");
835    }
836}