ucp-agent 0.1.18

Agent graph traversal system for UCP knowledge graphs
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! Safety mechanisms: limits, circuit breakers, and guards.

use crate::error::{AgentError, Result};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use tracing::{debug, warn};

/// Global limits across all sessions.
#[derive(Debug, Clone)]
pub struct GlobalLimits {
    /// Maximum concurrent sessions.
    pub max_sessions: usize,
    /// Maximum total blocks in memory across all contexts.
    pub max_total_context_blocks: usize,
    /// Maximum operations per second (rate limiting).
    pub max_ops_per_second: f64,
    /// Global timeout for any single operation.
    pub operation_timeout: Duration,
}

impl Default for GlobalLimits {
    fn default() -> Self {
        Self {
            max_sessions: 100,
            max_total_context_blocks: 100_000,
            max_ops_per_second: 1000.0,
            operation_timeout: Duration::from_secs(30),
        }
    }
}

/// Per-session limits.
#[derive(Debug, Clone)]
pub struct SessionLimits {
    /// Maximum context window tokens.
    pub max_context_tokens: usize,
    /// Maximum context window blocks.
    pub max_context_blocks: usize,
    /// Maximum depth for single expansion.
    pub max_expand_depth: usize,
    /// Maximum blocks returned per operation.
    pub max_results_per_operation: usize,
    /// Maximum operations before forced pause.
    pub max_operations_before_checkpoint: usize,
    /// Session timeout (inactivity).
    pub session_timeout: Duration,
    /// Maximum navigation history size.
    pub max_history_size: usize,
    /// Budget for costly operations.
    pub budget: OperationBudget,
}

impl Default for SessionLimits {
    fn default() -> Self {
        Self {
            max_context_tokens: 8_000,
            max_context_blocks: 200,
            max_expand_depth: 10,
            max_results_per_operation: 100,
            max_operations_before_checkpoint: 1000,
            session_timeout: Duration::from_secs(30 * 60), // 30 minutes
            max_history_size: 100,
            budget: OperationBudget::default(),
        }
    }
}

/// Budget for costly operations.
#[derive(Debug, Clone)]
pub struct OperationBudget {
    /// Total allowed traversal operations.
    pub traversal_operations: usize,
    /// Total allowed search operations.
    pub search_operations: usize,
    /// Total blocks allowed to be read.
    pub blocks_read: usize,
}

impl Default for OperationBudget {
    fn default() -> Self {
        Self {
            traversal_operations: 10_000,
            search_operations: 100,
            blocks_read: 50_000,
        }
    }
}

/// Tracks budget usage.
#[derive(Debug, Default)]
pub struct BudgetTracker {
    pub traversal_ops_used: AtomicUsize,
    pub search_ops_used: AtomicUsize,
    pub blocks_read_used: AtomicUsize,
}

impl BudgetTracker {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn record_traversal(&self) {
        self.traversal_ops_used.fetch_add(1, Ordering::Relaxed);
    }

    pub fn record_search(&self) {
        self.search_ops_used.fetch_add(1, Ordering::Relaxed);
    }

    pub fn record_blocks_read(&self, count: usize) {
        self.blocks_read_used.fetch_add(count, Ordering::Relaxed);
    }

    pub fn check_traversal_budget(&self, budget: &OperationBudget) -> Result<()> {
        let used = self.traversal_ops_used.load(Ordering::Relaxed);
        if used >= budget.traversal_operations {
            return Err(AgentError::BudgetExhausted {
                operation_type: "traversal".to_string(),
            });
        }
        Ok(())
    }

    pub fn check_search_budget(&self, budget: &OperationBudget) -> Result<()> {
        let used = self.search_ops_used.load(Ordering::Relaxed);
        if used >= budget.search_operations {
            return Err(AgentError::BudgetExhausted {
                operation_type: "search".to_string(),
            });
        }
        Ok(())
    }

    pub fn check_blocks_budget(&self, budget: &OperationBudget) -> Result<()> {
        let used = self.blocks_read_used.load(Ordering::Relaxed);
        if used >= budget.blocks_read {
            return Err(AgentError::BudgetExhausted {
                operation_type: "blocks_read".to_string(),
            });
        }
        Ok(())
    }

    pub fn reset(&self) {
        self.traversal_ops_used.store(0, Ordering::Relaxed);
        self.search_ops_used.store(0, Ordering::Relaxed);
        self.blocks_read_used.store(0, Ordering::Relaxed);
    }
}

/// Circuit breaker state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
    /// Normal operation.
    Closed,
    /// Failing, rejecting requests.
    Open,
    /// Testing recovery.
    HalfOpen,
}

/// Circuit breaker for detecting runaway operations.
pub struct CircuitBreaker {
    state: RwLock<CircuitState>,
    failure_count: AtomicUsize,
    failure_threshold: usize,
    recovery_timeout: Duration,
    last_failure: RwLock<Option<Instant>>,
    success_count_in_half_open: AtomicUsize,
    success_threshold: usize,
}

impl CircuitBreaker {
    pub fn new(failure_threshold: usize, recovery_timeout: Duration) -> Self {
        Self {
            state: RwLock::new(CircuitState::Closed),
            failure_count: AtomicUsize::new(0),
            failure_threshold,
            recovery_timeout,
            last_failure: RwLock::new(None),
            success_count_in_half_open: AtomicUsize::new(0),
            success_threshold: 3, // Require 3 successes to close
        }
    }

    pub fn state(&self) -> CircuitState {
        *self.state.read().unwrap()
    }

    pub fn can_proceed(&self) -> Result<()> {
        let state = *self.state.read().unwrap();

        match state {
            CircuitState::Closed => Ok(()),
            CircuitState::Open => {
                // Check if recovery timeout has passed
                let last_failure = self.last_failure.read().unwrap();
                if let Some(last) = *last_failure {
                    if last.elapsed() >= self.recovery_timeout {
                        // Transition to half-open
                        drop(last_failure);
                        *self.state.write().unwrap() = CircuitState::HalfOpen;
                        self.success_count_in_half_open.store(0, Ordering::Relaxed);
                        debug!("Circuit breaker transitioning to half-open");
                        return Ok(());
                    }
                }
                Err(AgentError::CircuitOpen {
                    reason: "Too many failures, circuit is open".to_string(),
                })
            }
            CircuitState::HalfOpen => {
                // Allow one request through to test
                Ok(())
            }
        }
    }

    pub fn record_success(&self) {
        let state = *self.state.read().unwrap();

        match state {
            CircuitState::Closed => {
                // Reset failure count on success
                self.failure_count.store(0, Ordering::Relaxed);
            }
            CircuitState::HalfOpen => {
                let successes = self
                    .success_count_in_half_open
                    .fetch_add(1, Ordering::Relaxed)
                    + 1;
                if successes >= self.success_threshold {
                    // Transition back to closed
                    *self.state.write().unwrap() = CircuitState::Closed;
                    self.failure_count.store(0, Ordering::Relaxed);
                    debug!("Circuit breaker closed after successful recovery");
                }
            }
            CircuitState::Open => {
                // Shouldn't happen, but ignore
            }
        }
    }

    pub fn record_failure(&self) {
        let state = *self.state.read().unwrap();

        match state {
            CircuitState::Closed => {
                let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
                if failures >= self.failure_threshold {
                    *self.state.write().unwrap() = CircuitState::Open;
                    *self.last_failure.write().unwrap() = Some(Instant::now());
                    warn!(
                        "Circuit breaker opened after {} failures",
                        self.failure_threshold
                    );
                }
            }
            CircuitState::HalfOpen => {
                // Failure during recovery - go back to open
                *self.state.write().unwrap() = CircuitState::Open;
                *self.last_failure.write().unwrap() = Some(Instant::now());
                self.success_count_in_half_open.store(0, Ordering::Relaxed);
                warn!("Circuit breaker re-opened after failure during half-open");
            }
            CircuitState::Open => {
                // Update last failure time
                *self.last_failure.write().unwrap() = Some(Instant::now());
            }
        }
    }

    pub fn reset(&self) {
        *self.state.write().unwrap() = CircuitState::Closed;
        self.failure_count.store(0, Ordering::Relaxed);
        *self.last_failure.write().unwrap() = None;
        self.success_count_in_half_open.store(0, Ordering::Relaxed);
    }
}

impl Default for CircuitBreaker {
    fn default() -> Self {
        Self::new(5, Duration::from_secs(30))
    }
}

/// RAII guard for depth tracking.
pub struct DepthGuardHandle<'a> {
    guard: &'a DepthGuard,
}

impl<'a> Drop for DepthGuardHandle<'a> {
    fn drop(&mut self) {
        self.guard.current.fetch_sub(1, Ordering::Relaxed);
    }
}

/// Depth guard prevents infinite recursion.
pub struct DepthGuard {
    current: AtomicUsize,
    max: usize,
}

impl DepthGuard {
    pub fn new(max: usize) -> Self {
        Self {
            current: AtomicUsize::new(0),
            max,
        }
    }

    /// Try to enter a deeper level. Returns a guard handle if successful.
    pub fn try_enter(&self) -> Result<DepthGuardHandle<'_>> {
        let current = self.current.fetch_add(1, Ordering::Relaxed);
        if current >= self.max {
            self.current.fetch_sub(1, Ordering::Relaxed);
            return Err(AgentError::DepthLimitExceeded {
                current: current + 1,
                max: self.max,
            });
        }
        Ok(DepthGuardHandle { guard: self })
    }

    pub fn current_depth(&self) -> usize {
        self.current.load(Ordering::Relaxed)
    }

    pub fn max_depth(&self) -> usize {
        self.max
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_budget_tracker() {
        let tracker = BudgetTracker::new();
        let budget = OperationBudget {
            traversal_operations: 3,
            search_operations: 2,
            blocks_read: 10,
        };

        // Record some operations
        tracker.record_traversal();
        tracker.record_traversal();
        assert!(tracker.check_traversal_budget(&budget).is_ok());

        tracker.record_traversal();
        assert!(tracker.check_traversal_budget(&budget).is_err());

        // Reset and try again
        tracker.reset();
        assert!(tracker.check_traversal_budget(&budget).is_ok());
    }

    #[test]
    fn test_circuit_breaker() {
        let cb = CircuitBreaker::new(3, Duration::from_millis(100));

        // Initially closed
        assert_eq!(cb.state(), CircuitState::Closed);
        assert!(cb.can_proceed().is_ok());

        // Record failures until open
        cb.record_failure();
        cb.record_failure();
        assert!(cb.can_proceed().is_ok());

        cb.record_failure();
        assert_eq!(cb.state(), CircuitState::Open);
        assert!(cb.can_proceed().is_err());

        // Wait for recovery timeout
        std::thread::sleep(Duration::from_millis(150));
        assert!(cb.can_proceed().is_ok()); // Should transition to half-open
        assert_eq!(cb.state(), CircuitState::HalfOpen);

        // Success in half-open should eventually close
        cb.record_success();
        cb.record_success();
        cb.record_success();
        assert_eq!(cb.state(), CircuitState::Closed);
    }

    #[test]
    fn test_depth_guard() {
        let guard = DepthGuard::new(3);

        assert_eq!(guard.current_depth(), 0);

        {
            let _h1 = guard.try_enter().unwrap();
            assert_eq!(guard.current_depth(), 1);

            {
                let _h2 = guard.try_enter().unwrap();
                assert_eq!(guard.current_depth(), 2);

                {
                    let _h3 = guard.try_enter().unwrap();
                    assert_eq!(guard.current_depth(), 3);

                    // Should fail now
                    assert!(guard.try_enter().is_err());
                }
                assert_eq!(guard.current_depth(), 2);
            }
            assert_eq!(guard.current_depth(), 1);
        }
        assert_eq!(guard.current_depth(), 0);
    }
}