opi-agent 0.5.0

General-purpose agent runtime with tool calling and session management
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
//! Extension system for agent customization.
//!
//! Provides the [`Extension`] trait for registering lifecycle hooks, custom
//! tools, custom commands, custom agent messages, and scoped extension state.
//!
//! # Lifecycle Ordering
//!
//! Extension hooks are called in registration order after the base hooks:
//!
//! 1. Base [`AgentHooks::before_tool_call`] then extension
//!    [`Extension::on_before_tool_call`] for each extension (in registration
//!    order).
//! 2. Base [`AgentHooks::after_tool_call`] then extension
//!    [`Extension::on_after_tool_call`] for each extension (in registration
//!    order).
//! 3. Base [`AgentHooks::prepare_next_turn`] then extension
//!    [`Extension::prepare_next_turn`] for each extension (in registration
//!    order). Extra messages are appended in that order.
//!
//! If any hook in the chain returns a deny/block result, the chain stops and
//! the denial propagates. Extensions cannot override a denial from the base
//! hooks or from an earlier extension.
//!
//! # Hook Error/Blocking Semantics
//!
//! - `on_before_tool_call` returning [`ExtensionHookResult::Block`] prevents
//!   the tool from executing. The block reason is returned to the agent loop
//!   as a tool error.
//! - `on_after_tool_call` is an observer callback; it cannot modify tool
//!   results. The base hook retains full control over result replacement via
//!   [`AfterToolCallResult::Replace`].
//! - `on_command` errors propagate to the caller via
//!   [`ExtensionError::CommandError`]. If an extension returns an error, the
//!   dispatch stops and the error is returned.
//!
//! # State Serialization
//!
//! Each extension can serialize and restore its own state. Extension states
//! are keyed by extension name in the serialized map produced by
//! [`ExtensionRegistry::serialize_states`]. Extensions that don't need state
//! persistence can use the default (no-op) implementations of
//! [`Extension::serialize_state`] and [`Extension::restore_state`].
//!
//! # Custom Tools
//!
//! Extensions provide tools via the [`Extension::tools`] method. These tools
//! are collected during [`ExtensionRegistry::collect_tools`] and added to the
//! agent's tool set alongside built-in tools. Extension tools follow the same
//! [`Tool`] trait contract and validation rules as built-in tools.
//!
//! # Custom Commands
//!
//! Extensions handle custom commands via [`Extension::on_command`]. Commands
//! are dispatched via [`ExtensionRegistry::dispatch_command`] to extensions in
//! registration order. The first extension that returns `Ok(Some(value))`
//! claims the command.
//!
//! # Unstable
//!
//! This module is part of the **unstable 0.x extension API**. Breaking changes
//! may occur between minor versions without a major version bump.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use serde_json::Value;
use tokio_util::sync::CancellationToken;

use opi_ai::provider::{ModelInfo, Provider};

use crate::event::AgentEvent;
use crate::hooks::{
    AfterToolCallContext, AfterToolCallResult, AgentHooks, BeforeToolCallContext,
    BeforeToolCallResult, PrepareNextTurnContext, ShouldStopAfterTurnContext,
};
use crate::loop_types::{AgentError, AgentLoopTurnUpdate};
use crate::message::AgentMessage;
use crate::tool::{Tool, ToolResult};

// ---------------------------------------------------------------------------
// Error types
// ---------------------------------------------------------------------------

/// Errors from extension operations.
#[derive(Debug, thiserror::Error)]
pub enum ExtensionError {
    /// An extension with the same name is already registered.
    #[error("duplicate extension name: {0}")]
    DuplicateName(String),
    /// The registry has already been shared with hooks or an event sink.
    #[error("cannot register extensions after registry has been shared")]
    RegistryLocked,
    /// Extension state serialization failed.
    #[error("state serialization failed for extension '{name}': {reason}")]
    StateSerialization { name: String, reason: String },
    /// Extension state restoration failed.
    #[error("state restoration failed for extension '{name}': {reason}")]
    StateRestoration { name: String, reason: String },
    /// An extension command returned an error.
    #[error("extension command error: {0}")]
    CommandError(String),
    /// A generic extension error.
    #[error("{0}")]
    Other(String),
}

// ---------------------------------------------------------------------------
// Hook result
// ---------------------------------------------------------------------------

/// Result of an extension lifecycle hook.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum ExtensionHookResult {
    /// Continue processing. The next hook in the chain (or the default
    /// behavior) is invoked.
    Continue,
    /// Block further processing. For `on_before_tool_call`, this denies the
    /// tool execution. The block reason is returned to the agent loop as a
    /// tool error.
    Block { reason: String },
}

// ---------------------------------------------------------------------------
// Extension command
// ---------------------------------------------------------------------------

/// A custom command dispatched to extensions.
#[derive(Debug, Clone)]
pub struct ExtensionCommand {
    /// The command name (e.g. `"todo/add"`).
    pub name: String,
    /// Optional ID for response correlation.
    pub id: Option<String>,
    /// Command arguments.
    pub args: Value,
}

impl ExtensionCommand {
    /// Create a new extension command.
    pub fn new(name: impl Into<String>, args: Value) -> Self {
        Self {
            name: name.into(),
            id: None,
            args,
        }
    }

    /// Add an ID for response correlation.
    pub fn with_id(mut self, id: impl Into<String>) -> Self {
        self.id = Some(id.into());
        self
    }
}

// ---------------------------------------------------------------------------
// Extension trait
// ---------------------------------------------------------------------------

/// Extension trait for registering lifecycle hooks, custom tools,
/// custom commands, and scoped extension state.
///
/// See the [module-level documentation](self) for lifecycle ordering,
/// error/blocking semantics, and state serialization contracts.
///
/// # Unstable
///
/// This trait is part of the **unstable 0.x extension API**. Breaking changes
/// may occur between minor versions without a major version bump.
pub trait Extension: Send + Sync {
    /// Unique name for this extension. Must be non-empty and unique within
    /// the registry.
    fn name(&self) -> &str;

    /// Tools provided by this extension.
    ///
    /// Called once during [`ExtensionRegistry::collect_tools`] to gather
    /// extension tools for the agent's tool set.
    fn tools(&self) -> Vec<Box<dyn Tool>> {
        vec![]
    }

    /// Custom providers provided by this extension.
    ///
    /// Called during [`ExtensionRegistry::collect_providers`] to gather
    /// providers for registration with the provider registry. Extensions
    /// should return new provider instances on each call since `Box<dyn
    /// Provider>` is not `Clone`.
    ///
    /// Provider breadth should arrive through registration rather than core
    /// provider additions.
    fn providers(&self) -> Vec<Box<dyn Provider>> {
        vec![]
    }

    /// Additional models to register for existing providers.
    ///
    /// Called during [`ExtensionRegistry::collect_model_overrides`] to gather
    /// model metadata that supplements or overrides the models declared by
    /// built-in providers. Each entry is `(provider_id, ModelInfo)`.
    fn model_overrides(&self) -> Vec<(String, ModelInfo)> {
        vec![]
    }

    /// Called before a tool is executed (after the base hook, in registration
    /// order).
    ///
    /// Return [`ExtensionHookResult::Block`] to deny the tool execution.
    fn on_before_tool_call(
        &self,
        tool_name: &str,
        args: &Value,
    ) -> Pin<Box<dyn Future<Output = ExtensionHookResult> + Send>> {
        let _ = (tool_name, args);
        Box::pin(async { ExtensionHookResult::Continue })
    }

    /// Called after a tool has been executed (after the base hook, in
    /// registration order).
    ///
    /// This is an observer callback; it cannot modify the tool result.
    fn on_after_tool_call(
        &self,
        tool_name: &str,
        result: &ToolResult,
    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
        let _ = (tool_name, result);
        Box::pin(async {})
    }

    /// Prepare context before the next turn begins.
    ///
    /// Extensions may return extra messages to inject into the agent's next
    /// turn. Composite hooks append these messages after the base hook's
    /// messages and preserve extension registration order.
    fn prepare_next_turn(
        &self,
        _ctx: &PrepareNextTurnContext,
    ) -> Pin<Box<dyn Future<Output = Option<AgentLoopTurnUpdate>> + Send>> {
        Box::pin(async { None })
    }

    /// Called for every agent event.
    fn on_event(&self, _event: &AgentEvent) {}

    /// Handle a custom command.
    ///
    /// Return `Ok(Some(value))` if the command was handled, `Ok(None)` if
    /// the command is not recognized by this extension.
    fn on_command(
        &self,
        _command: &ExtensionCommand,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Value>, ExtensionError>> + Send>> {
        Box::pin(async { Ok(None) })
    }

    /// Serialize extension state for session persistence.
    ///
    /// Return `Ok(Some(value))` with the serialized state, or `Ok(None)` if
    /// the extension has no state to persist.
    fn serialize_state(&self) -> Result<Option<Value>, ExtensionError> {
        Ok(None)
    }

    /// Restore extension state from session persistence.
    fn restore_state(&self, _state: Value) -> Result<(), ExtensionError> {
        Ok(())
    }
}

// ---------------------------------------------------------------------------
// ExtensionRegistry
// ---------------------------------------------------------------------------

/// Registry that manages extensions and provides integration wrappers.
///
/// Extensions are registered before the agent loop starts. Once hooks or event
/// sinks are wrapped via [`wrap_hooks`](Self::wrap_hooks) or
/// [`wrap_event_sink`](Self::wrap_event_sink), the registry should not be
/// modified further.
pub struct ExtensionRegistry {
    extensions: Arc<Vec<Box<dyn Extension>>>,
}

impl Clone for ExtensionRegistry {
    fn clone(&self) -> Self {
        Self {
            extensions: self.extensions.clone(),
        }
    }
}

impl ExtensionRegistry {
    /// Create an empty registry.
    pub fn new() -> Self {
        Self {
            extensions: Arc::new(Vec::new()),
        }
    }

    /// Register an extension.
    ///
    /// Returns an error if an extension with the same name already exists.
    /// Returns [`ExtensionError::RegistryLocked`] if called after
    /// `wrap_hooks()` or `wrap_event_sink()` has been called (i.e., the
    /// extension list is shared).
    pub fn register(&mut self, ext: Box<dyn Extension>) -> Result<(), ExtensionError> {
        let name = ext.name().to_string();
        if self.extensions.iter().any(|e| e.name() == name) {
            return Err(ExtensionError::DuplicateName(name));
        }
        match Arc::get_mut(&mut self.extensions) {
            Some(exts) => {
                exts.push(ext);
            }
            None => {
                return Err(ExtensionError::RegistryLocked);
            }
        }
        Ok(())
    }

    /// Returns true if no extensions are registered.
    pub fn is_empty(&self) -> bool {
        self.extensions.is_empty()
    }

    /// Returns the number of registered extensions.
    pub fn len(&self) -> usize {
        self.extensions.len()
    }

    /// Return extension names in registration order.
    pub fn names(&self) -> Vec<&str> {
        self.extensions.iter().map(|e| e.name()).collect()
    }

    /// Look up an extension by name.
    pub fn get(&self, name: &str) -> Option<&dyn Extension> {
        self.extensions
            .iter()
            .find(|e| e.name() == name)
            .map(|e| e.as_ref())
    }

    /// Collect all tools from all registered extensions.
    pub fn collect_tools(&self) -> Vec<Box<dyn Tool>> {
        self.extensions.iter().flat_map(|e| e.tools()).collect()
    }

    /// Collect all custom providers from all registered extensions.
    ///
    /// Each extension's [`providers`](Extension::providers) method is called
    /// and the results are concatenated. Extensions should return fresh
    /// provider instances since `Box<dyn Provider>` is not `Clone`.
    pub fn collect_providers(&self) -> Vec<Box<dyn Provider>> {
        self.extensions.iter().flat_map(|e| e.providers()).collect()
    }

    /// Collect all model overrides from all registered extensions.
    ///
    /// Each extension's [`model_overrides`](Extension::model_overrides) method
    /// is called and the results are concatenated.
    pub fn collect_model_overrides(&self) -> Vec<(String, ModelInfo)> {
        self.extensions
            .iter()
            .flat_map(|e| e.model_overrides())
            .collect()
    }

    /// Dispatch an event to all registered extensions.
    pub fn dispatch_event(&self, event: &AgentEvent) {
        for ext in self.extensions.iter() {
            ext.on_event(event);
        }
    }

    /// Dispatch a custom command to extensions in registration order.
    ///
    /// Returns the first `Some` response, or `None` if no extension handled
    /// the command.
    pub async fn dispatch_command(
        &self,
        command: &ExtensionCommand,
    ) -> Result<Option<Value>, ExtensionError> {
        for ext in self.extensions.iter() {
            if let Some(value) = ext.on_command(command).await? {
                return Ok(Some(value));
            }
        }
        Ok(None)
    }

    /// Serialize all extension states into a JSON object keyed by extension
    /// name.
    pub fn serialize_states(&self) -> Result<Value, ExtensionError> {
        let mut map = serde_json::Map::new();
        for ext in self.extensions.iter() {
            match ext.serialize_state() {
                Ok(Some(state)) => {
                    map.insert(ext.name().to_string(), state);
                }
                Ok(None) => {}
                Err(e) => return Err(e),
            }
        }
        Ok(Value::Object(map))
    }

    /// Restore extension states from a JSON object keyed by extension name.
    pub fn restore_states(&self, states: Value) -> Result<(), ExtensionError> {
        let map = match states {
            Value::Object(m) => m,
            _ => return Ok(()),
        };
        for ext in self.extensions.iter() {
            if let Some(state) = map.get(ext.name()) {
                ext.restore_state(state.clone())?;
            }
        }
        Ok(())
    }

    /// Create a composite [`AgentHooks`] that wraps the base hooks with
    /// extension lifecycle callbacks.
    ///
    /// Extension hooks are called after the base hooks. If any extension
    /// returns [`ExtensionHookResult::Block`], the chain stops and the block
    /// propagates as a denial.
    pub fn wrap_hooks(&self, base: Box<dyn AgentHooks>) -> Box<dyn AgentHooks> {
        Box::new(CompositeHooks {
            base: Arc::from(base),
            extensions: self.extensions.clone(),
        })
    }

    /// Wrap an event sink to dispatch events to all registered extensions
    /// before forwarding to the base sink.
    pub fn wrap_event_sink(
        &self,
        base_sink: crate::event::AgentEventSink,
    ) -> crate::event::AgentEventSink {
        let extensions = self.extensions.clone();
        Box::new(move |event: AgentEvent| {
            for ext in extensions.iter() {
                ext.on_event(&event);
            }
            base_sink(event);
        })
    }
}

impl Default for ExtensionRegistry {
    fn default() -> Self {
        Self::new()
    }
}

// ---------------------------------------------------------------------------
// CompositeHooks
// ---------------------------------------------------------------------------

/// Internal type that chains extension hooks after base hooks.
struct CompositeHooks {
    base: Arc<dyn AgentHooks>,
    extensions: Arc<Vec<Box<dyn Extension>>>,
}

impl AgentHooks for CompositeHooks {
    fn convert_to_llm(
        &self,
        messages: &[AgentMessage],
    ) -> Result<Vec<opi_ai::message::Message>, AgentError> {
        self.base.convert_to_llm(messages)
    }

    fn transform_context(
        &self,
        messages: Vec<AgentMessage>,
        signal: CancellationToken,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<AgentMessage>, AgentError>> + Send>> {
        self.base.transform_context(messages, signal)
    }

    fn should_stop_after_turn(
        &self,
        ctx: ShouldStopAfterTurnContext,
    ) -> Pin<Box<dyn Future<Output = bool> + Send>> {
        self.base.should_stop_after_turn(ctx)
    }

    fn before_tool_call(
        &self,
        ctx: BeforeToolCallContext,
    ) -> Pin<Box<dyn Future<Output = BeforeToolCallResult> + Send>> {
        let base = self.base.clone();
        let extensions = self.extensions.clone();
        let tool_name = ctx.tool_name.clone();
        let args = ctx.args.clone();
        Box::pin(async move {
            // Base hook decides first.
            match base.before_tool_call(ctx).await {
                BeforeToolCallResult::Allow => {}
                BeforeToolCallResult::Deny { reason } => {
                    return BeforeToolCallResult::Deny { reason };
                }
            }

            // Extension hooks in registration order.
            for ext in extensions.iter() {
                match ext.on_before_tool_call(&tool_name, &args).await {
                    ExtensionHookResult::Continue => {}
                    ExtensionHookResult::Block { reason } => {
                        return BeforeToolCallResult::Deny { reason };
                    }
                }
            }

            BeforeToolCallResult::Allow
        })
    }

    fn after_tool_call(
        &self,
        ctx: AfterToolCallContext,
    ) -> Pin<Box<dyn Future<Output = AfterToolCallResult> + Send>> {
        let base = self.base.clone();
        let extensions = self.extensions.clone();
        let tool_name = ctx.tool_name.clone();
        let result_snapshot = ctx.result.clone();
        Box::pin(async move {
            // Base hook decides first (may keep or replace).
            let base_result = base.after_tool_call(ctx).await;

            // Determine the effective result for extension observation.
            let effective: &ToolResult = match &base_result {
                AfterToolCallResult::Keep => &result_snapshot,
                AfterToolCallResult::Replace(r) => r,
            };

            // Notify extension observers (cannot modify result).
            for ext in extensions.iter() {
                ext.on_after_tool_call(&tool_name, effective).await;
            }

            base_result
        })
    }

    fn prepare_next_turn(
        &self,
        ctx: PrepareNextTurnContext,
    ) -> Pin<Box<dyn Future<Output = Option<AgentLoopTurnUpdate>> + Send>> {
        let base = self.base.clone();
        let extensions = self.extensions.clone();
        let extension_ctx = PrepareNextTurnContext {
            messages: ctx.messages.clone(),
            turn: ctx.turn,
        };
        Box::pin(async move {
            let mut extra_messages = base
                .prepare_next_turn(ctx)
                .await
                .map(|update| update.extra_messages)
                .unwrap_or_default();

            for ext in extensions.iter() {
                if let Some(update) = ext.prepare_next_turn(&extension_ctx).await {
                    extra_messages.extend(update.extra_messages);
                }
            }

            if extra_messages.is_empty() {
                None
            } else {
                Some(AgentLoopTurnUpdate { extra_messages })
            }
        })
    }
}