opencrabs 0.3.49

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
//! Fallback Provider
//!
//! Wraps a primary provider with an ordered list of fallbacks.
//! When a provider returns a rate-limit (or other retryable) error, the
//! next provider in the chain is tried. After a successful fallback the
//! chosen provider becomes **sticky** — subsequent calls skip the dead
//! primary entirely until the process exits, so a single 429 doesn't
//! cost 60s of retries on every following turn.

use super::error::{ProviderError, Result};
use super::r#trait::{Provider, ProviderStream};
use super::types::{LLMRequest, LLMResponse};
use async_trait::async_trait;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};

/// Description of a swap that just occurred — consumed once by the
/// caller (typically the agent service) so it can surface a UI alert.
#[derive(Debug, Clone)]
pub struct SwapEvent {
    pub from_name: String,
    pub from_model: String,
    pub to_name: String,
    pub to_model: String,
    pub reason: String,
}

/// A provider that tries a chain of providers in order on failure.
///
/// `active` indexes into the chain: 0 = primary, 1..=fallbacks.len() = the
/// (n-1)-th fallback. After a successful swap, `active` advances and stays
/// there for the rest of the process — there is no automatic recovery
/// back to the original primary.
pub struct FallbackProvider {
    primary: Arc<dyn Provider>,
    fallbacks: Vec<Arc<dyn Provider>>,
    active: AtomicUsize,
    pending_swap: Mutex<Option<SwapEvent>>,
}

impl FallbackProvider {
    pub fn new(primary: Arc<dyn Provider>, fallbacks: Vec<Arc<dyn Provider>>) -> Self {
        Self {
            primary,
            fallbacks,
            active: AtomicUsize::new(0),
            pending_swap: Mutex::new(None),
        }
    }

    /// Create a FallbackProvider with health-aware startup.
    /// If the primary has recent failures and a fallback has recent success,
    /// advances the active index to skip the dead primary immediately.
    /// This persists sticky fallbacks across process restarts.
    pub fn new_with_health(primary: Arc<dyn Provider>, fallbacks: Vec<Arc<dyn Provider>>) -> Self {
        let start_idx = Self::compute_health_start_index(&primary, &fallbacks);
        Self {
            primary,
            fallbacks,
            active: AtomicUsize::new(start_idx),
            pending_swap: Mutex::new(None),
        }
    }

    /// Check provider health and compute the starting active index.
    /// Returns 0 (primary) if healthy, or the index of the healthiest fallback.
    fn compute_health_start_index(
        primary: &Arc<dyn Provider>,
        fallbacks: &[Arc<dyn Provider>],
    ) -> usize {
        // Only skip primary if it has consecutive failures AND a fallback is healthier
        let primary_health = crate::config::health::get_health(primary.name());
        let primary_fails = primary_health
            .as_ref()
            .map(|h| h.consecutive_failures)
            .unwrap_or(0);

        // Require at least 2 consecutive failures to skip primary on startup
        if primary_fails < 2 {
            return 0;
        }

        // Find the fallback with most recent success
        let mut best_idx: Option<usize> = None;
        let mut best_ts: u64 = 0;

        for (i, fb) in fallbacks.iter().enumerate() {
            if let Some(health) = crate::config::health::get_health(fb.name())
                && let Some(ts) = health.last_success
                && ts > best_ts
                && health.consecutive_failures < primary_fails
            {
                best_ts = ts;
                best_idx = Some(i + 1); // +1 because index 0 is primary
            }
        }

        if let Some(idx) = best_idx {
            tracing::info!(
                "Health-aware startup: skipping unhealthy primary '{}' ({} failures), \
                 starting at fallback index {}",
                primary.name(),
                primary_fails,
                idx
            );
            idx
        } else {
            0
        }
    }

    /// Get the currently-active provider (primary or a sticky fallback).
    fn active_provider(&self) -> Arc<dyn Provider> {
        let idx = self.active.load(Ordering::Acquire);
        if idx == 0 {
            self.primary.clone()
        } else {
            self.fallbacks[idx - 1].clone()
        }
    }

    /// Promote a fallback to active. Records a swap event for the caller
    /// to surface in the UI.
    fn promote(&self, new_idx: usize, reason: &str) {
        let old_idx = self.active.swap(new_idx, Ordering::AcqRel);
        if old_idx == new_idx {
            return;
        }
        let from = if old_idx == 0 {
            &self.primary
        } else {
            &self.fallbacks[old_idx - 1]
        };
        let to = if new_idx == 0 {
            &self.primary
        } else {
            &self.fallbacks[new_idx - 1]
        };
        let event = SwapEvent {
            from_name: from.name().to_string(),
            from_model: from.default_model().to_string(),
            to_name: to.name().to_string(),
            to_model: to.default_model().to_string(),
            reason: reason.to_string(),
        };
        tracing::warn!(
            "Sticky fallback: '{}/{}' → '{}/{}' (reason: {})",
            event.from_name,
            event.from_model,
            event.to_name,
            event.to_model,
            event.reason
        );
        if let Ok(mut slot) = self.pending_swap.lock() {
            *slot = Some(event);
        }
    }

    /// Build a request for a fallback provider, remapping the model if needed.
    fn remap_request_for_fallback(fb: &dyn Provider, request: &LLMRequest) -> LLMRequest {
        let mut fb_request = request.clone();
        let supported = fb.supported_models();
        if !supported.is_empty() && !supported.iter().any(|m| m == &fb_request.model) {
            let new_model = fb.default_model().to_string();
            tracing::info!(
                "Fallback '{}': model '{}' not supported — remapping to '{}'",
                fb.name(),
                fb_request.model,
                new_model
            );
            fb_request.model = new_model;
        }
        fb_request
    }

    /// Decide whether an error justifies trying the next provider in the
    /// chain. Beyond transient errors (rate-limit, 5xx, timeout), we also
    /// fall through on model/parameter mismatches — the fallback provider
    /// may support the request after model remapping.
    /// Auth errors (401/403) also trigger fallback — for OAuth providers
    /// like Qwen, a 401 after refresh failure means the token is dead and
    /// the provider is unusable until re-authenticated.
    fn should_try_next(err: &ProviderError) -> bool {
        if err.is_retryable() {
            return true;
        }
        match err {
            // Model not supported by this provider — fallback may have it
            ProviderError::ModelNotFound(_) => true,
            // Invalid parameter / unsupported model returned as 400 —
            // often means the model or parameter isn't valid for this
            // specific provider but a fallback with remapping may work
            ProviderError::ApiError { status: 400, .. } => true,
            // Auth failure (401/403) — provider is dead (expired OAuth,
            // revoked key, etc.). Fall to next provider rather than
            // surfacing a cryptic auth error to the user.
            ProviderError::ApiError {
                status: 401 | 403, ..
            }
            | ProviderError::InvalidApiKey => true,
            // InvalidRequest covers parsed model/param errors
            ProviderError::InvalidRequest(_) => true,
            _ => false,
        }
    }
}

#[async_trait]
impl Provider for FallbackProvider {
    async fn complete(&self, request: LLMRequest) -> Result<LLMResponse> {
        let start_idx = self.active.load(Ordering::Acquire);
        let mut last_err: Option<ProviderError>;

        // Try the currently-active provider first.
        // Always remap — after a restart the sticky index resets to 0 but
        // the request may still carry a model from a previously-active
        // provider (e.g. "openrouter/elephant-alpha" sent to Qwen).
        let active = self.active_provider();
        let active_request = Self::remap_request_for_fallback(active.as_ref(), &request);
        match active.complete(active_request).await {
            Ok(resp) => return Ok(resp),
            Err(e) if !Self::should_try_next(&e) => return Err(e),
            Err(e) => {
                tracing::warn!(
                    "Active provider '{}' failed: {} — trying next in chain",
                    active.name(),
                    e
                );
                last_err = Some(e);
            }
        }

        // Try subsequent fallbacks (skip ones already exhausted by the
        // sticky pointer — start_idx already accounts for them)
        for offset in start_idx..self.fallbacks.len() {
            let fb = &self.fallbacks[offset];
            let fb_request = Self::remap_request_for_fallback(fb.as_ref(), &request);
            match fb.complete(fb_request).await {
                Ok(resp) => {
                    self.promote(
                        offset + 1,
                        last_err
                            .as_ref()
                            .map(super::error::user_facing_reason)
                            .unwrap_or_else(|| "unknown".into())
                            .as_str(),
                    );
                    return Ok(resp);
                }
                Err(e) => {
                    tracing::warn!("Fallback provider '{}' failed: {}", fb.name(), e);
                    last_err = Some(e);
                }
            }
        }

        Err(last_err.unwrap_or_else(|| {
            ProviderError::Internal("FallbackProvider: all providers exhausted".into())
        }))
    }

    async fn stream(&self, request: LLMRequest) -> Result<ProviderStream> {
        let start_idx = self.active.load(Ordering::Acquire);
        let mut last_err: Option<ProviderError>;

        // Try the currently-active provider first.
        // Always remap — see complete() comment.
        let active = self.active_provider();
        let active_request = Self::remap_request_for_fallback(active.as_ref(), &request);
        match active.stream(active_request).await {
            Ok(stream) => return Ok(stream),
            Err(e) if !Self::should_try_next(&e) => return Err(e),
            Err(e) => {
                tracing::warn!(
                    "Active provider '{}' stream failed: {} — trying next in chain",
                    active.name(),
                    e
                );
                last_err = Some(e);
            }
        }

        // Try subsequent fallbacks
        for offset in start_idx..self.fallbacks.len() {
            let fb = &self.fallbacks[offset];
            let fb_request = Self::remap_request_for_fallback(fb.as_ref(), &request);
            match fb.stream(fb_request).await {
                Ok(stream) => {
                    self.promote(
                        offset + 1,
                        last_err
                            .as_ref()
                            .map(super::error::user_facing_reason)
                            .unwrap_or_else(|| "unknown".into())
                            .as_str(),
                    );
                    return Ok(stream);
                }
                Err(e) => {
                    tracing::warn!("Fallback provider '{}' stream failed: {}", fb.name(), e);
                    last_err = Some(e);
                }
            }
        }

        Err(last_err.unwrap_or_else(|| {
            ProviderError::Internal("FallbackProvider: all providers exhausted".into())
        }))
    }

    fn supports_streaming(&self) -> bool {
        self.primary.supports_streaming()
    }

    fn supports_tools(&self) -> bool {
        self.primary.supports_tools()
    }

    fn supports_vision(&self) -> bool {
        self.primary.supports_vision()
    }

    fn cli_handles_tools(&self) -> bool {
        self.primary.cli_handles_tools()
    }

    fn cli_manages_context(&self) -> bool {
        self.primary.cli_manages_context()
    }

    fn name(&self) -> &str {
        // Persistence and config-display name stays as the originally-configured
        // primary, even after a sticky swap. Use `active_subprovider_name()` for
        // the live indicator.
        self.primary.name()
    }

    fn is_fallback_chain(&self) -> bool {
        true
    }

    fn base_url(&self) -> Option<&str> {
        // Forward the primary's base_url so features that identify specific
        // proxies by URL (e.g. dialagram gaslighting strip) keep working even
        // when the provider is wrapped in a fallback chain.
        self.primary.base_url()
    }

    fn default_model(&self) -> &str {
        self.primary.default_model()
    }

    fn supported_models(&self) -> Vec<String> {
        self.primary.supported_models()
    }

    async fn fetch_models(&self) -> Vec<String> {
        self.primary.fetch_models().await
    }

    fn context_window(&self, model: &str) -> Option<u32> {
        self.primary.context_window(model)
    }

    fn configured_context_window(&self) -> Option<u32> {
        self.primary.configured_context_window()
    }

    fn calculate_cost(&self, model: &str, input_tokens: u32, output_tokens: u32) -> f64 {
        self.primary
            .calculate_cost(model, input_tokens, output_tokens)
    }

    fn force_next_fallback(&self, reason: &str) -> bool {
        let current = self.active.load(Ordering::Acquire);
        let next = current + 1;
        let total = 1 + self.fallbacks.len(); // primary + fallbacks
        if next >= total {
            tracing::warn!(
                "force_next_fallback: no more fallbacks (current={}, total={})",
                current,
                total,
            );
            return false;
        }
        self.promote(next, reason);
        tracing::info!(
            "force_next_fallback: promoted index {} → {} (reason: {})",
            current,
            next,
            reason,
        );
        true
    }

    fn take_swap_event(&self) -> Option<SwapEvent> {
        self.pending_swap.lock().ok().and_then(|mut s| s.take())
    }

    fn take_retry_notices(&self) -> Vec<(u32, u32, String)> {
        // Aggregate retries from the primary and every fallback that was
        // tried this turn, in chain order, so the user sees the full
        // resilience sequence ("⏳ Retry 2/4 — dialagram …" then the
        // fallback's own retries).
        let mut out = self.primary.take_retry_notices();
        for fb in &self.fallbacks {
            out.extend(fb.take_retry_notices());
        }
        out
    }

    fn active_subprovider_name(&self) -> Option<String> {
        let idx = self.active.load(Ordering::Acquire);
        if idx == 0 {
            None
        } else {
            Some(self.fallbacks[idx - 1].name().to_string())
        }
    }

    fn active_subprovider_model(&self) -> Option<String> {
        let idx = self.active.load(Ordering::Acquire);
        if idx == 0 {
            None
        } else {
            Some(self.fallbacks[idx - 1].default_model().to_string())
        }
    }
}