Skip to main content

agy_bridge/agent/
mod.rs

1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7    Arc, Mutex,
8    atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12    config::AgentConfig,
13    content::Content,
14    error::Error,
15    streaming::{ChatResponseHandle, ChatResponseSharedState},
16    types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37    async_fn_in_trait,
38    reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41    /// Create an agent from the given config, returning its ID.
42    async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44    /// Send a chat message to the agent, returning a streaming response handle.
45    ///
46    /// The `content` parameter accepts any [`Content`] variant: plain text,
47    /// images, documents, audio, video, or a multi-part list.
48    async fn chat(&self, agent_id: AgentId, content: &Content)
49    -> Result<ChatResponseHandle, Error>;
50
51    /// Gracefully shut down the agent.
52    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54    /// Interrupt any active prompt/chat run.
55    async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57    /// Wait for the active run or conversational loop to stabilize.
58    async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60    /// Send a message without waiting for completion.
61    async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63    /// Signal that the agent is idle.
64    async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66    /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67    async fn wait_for_wakeup(
68        &self,
69        agent_id: AgentId,
70        timeout: std::time::Duration,
71    ) -> Result<bool, Error>;
72
73    /// Wait if we're in a quota backoff period.
74    async fn wait_for_quota(&self);
75
76    /// Record a quota hit with the suggested retry duration.
77    async fn record_quota_hit(&self, retry_after: std::time::Duration);
78
79    /// Access this runtime's per-key quota registry.
80    ///
81    /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
82    /// so different runtimes have fully independent quota tracking.
83    fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
84
85    /// Retrieve the conversation's message history.
86    async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
87
88    /// Return the number of completed turns in the conversation.
89    async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
90
91    /// Return cumulative token usage across all turns.
92    async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
93
94    /// Return token usage from the most recent turn only.
95    async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
96
97    /// Clear the conversation history and reset state.
98    async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
99
100    /// Return the text of the last model response, if any.
101    ///
102    /// Default implementation returns `Ok(None)`.
103    async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
104        Ok(None)
105    }
106
107    /// Return the step indices at which compaction occurred.
108    ///
109    /// Default implementation returns an empty list.
110    async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
111        Ok(Vec::new())
112    }
113
114    /// Delete the conversation and all associated state.
115    ///
116    /// Default implementation is a no-op that returns `Ok(())`.
117    async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
118        Ok(())
119    }
120
121    /// Disconnect from the agent without deleting state.
122    ///
123    /// Default implementation is a no-op that returns `Ok(())`.
124    async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
125        Ok(())
126    }
127
128    /// Check whether the agent is currently idle (not running a turn).
129    ///
130    /// Default implementation returns `Ok(true)`.
131    async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
132        Ok(true)
133    }
134
135    /// Best-effort synchronous shutdown signal, called from [`Drop`].
136    ///
137    /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
138    /// fire-and-forget — it cannot return errors. The default is a no-op;
139    /// implementations backed by a command channel should `try_send` a
140    /// shutdown command here.
141    fn try_shutdown_agent(&self, _agent_id: AgentId) {}
142}
143
144/// Handle to a running agent.
145///
146/// Wraps the agent's lifecycle: creation, chat, and shutdown.
147///
148/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
149/// If the handle is dropped without calling `shutdown()`, a best-effort
150/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
151/// will be cleaned up, but errors are only logged, not returned.
152///
153/// Most methods take `&self` — interior mutability is used where needed
154/// so multiple concurrent operations can share a single handle.
155///
156/// # Mutex choice
157///
158/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
159/// because every lock acquisition is a brief, synchronous operation (pointer
160/// swap or clone) that **never** spans an `.await` point. For these
161/// microsecond critical sections, `std::sync::Mutex` is both simpler and
162/// lower-overhead than the async alternative.
163pub struct AgentHandle<R: Runtime + 'static> {
164    id: AgentId,
165    runtime: Arc<R>,
166    config: AgentConfig,
167    /// Per-API-key quota state. Agents sharing the same effective API key
168    /// share backoff tracking; agents with different keys are independent.
169    quota_state: Arc<crate::quota::QuotaState>,
170    /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
171    /// entry isn't the only strong reference.
172    _registry: Option<Arc<crate::tools::ToolRegistry>>,
173    /// Kept alive to preserve a strong reference to the policy confirmation handler.
174    policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
175    conversation_id: Mutex<Option<String>>,
176    is_started: AtomicBool,
177    is_shutdown: AtomicBool,
178    /// Shared state from the last completed chat response, used to surface
179    /// `get_last_structured_output()` without round-tripping to Python.
180    ///
181    /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
182    /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
183    last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
184}
185
186impl<R: Runtime> AgentHandle<R> {
187    /// Create a new agent from the given runtime and configuration.
188    ///
189    /// This sends a `CreateAgent` command to the Python runtime, waits for
190    /// quota availability, and returns the handle.
191    ///
192    /// # Errors
193    ///
194    /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
195    /// Python error, or quota exceeded).
196    pub async fn new(
197        runtime: Arc<R>,
198        config: AgentConfig,
199        registry: Option<Arc<crate::tools::ToolRegistry>>,
200        hook_runner: Option<Arc<crate::hooks::Hooks>>,
201        policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
202    ) -> Result<Self, Error> {
203        let quota_key = config.effective_api_key().unwrap_or_default();
204        let quota_state = runtime.quota_registry().state_for_key(&quota_key);
205        let id = if hook_runner.is_some() {
206            // Serialize the set→create→clear sequence so concurrent creates
207            // cannot overwrite each other's temporary hook runner.
208            //
209            // NOTE: These must remain process-global because the Python-side
210            // callback (`dispatch_rust_hook`) is itself process-global — it is
211            // registered in `sys.modules["_agy_bridge_globals"]` and has no way
212            // to identify which runtime instance triggered it. A per-runtime
213            // guard would not prevent cross-runtime races on the shared
214            // INITIALIZING_HOOK_RUNNER slot.
215            let _guard = crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await;
216            if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
217                *opt = hook_runner.as_ref().map(Arc::clone);
218            } else {
219                tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — hook may not fire");
220            }
221            let result = runtime.create_agent(config.clone()).await;
222            if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
223                *opt = None;
224            } else {
225                tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — stale hook may persist");
226            }
227            result?
228        } else {
229            runtime.create_agent(config.clone()).await?
230        };
231
232        tracing::info!(agent_id = id, "Agent created successfully");
233
234        // Build and insert per-agent bridge state in a single lock acquisition.
235        let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
236        let bridge_entry = crate::runtime::AgentBridgeState {
237            registry: registry.as_ref().map(Arc::clone),
238            hook_runner: hook_runner.as_ref().map(Arc::clone),
239            policies: policies_set,
240            policy_handler: policy_handler.as_ref().map(Arc::clone),
241            tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
242        };
243        if let Ok(mut map) = crate::runtime::bridge_state().write() {
244            map.insert(id, bridge_entry);
245        } else {
246            tracing::error!(
247                agent_id = id,
248                "Failed to acquire write lock on BRIDGE_STATE"
249            );
250        }
251
252        let conversation_id = Mutex::new(config.conversation_id.clone());
253        Ok(Self {
254            id,
255            runtime,
256            config,
257            quota_state,
258            _registry: registry,
259            policy_handler,
260            conversation_id,
261            is_started: AtomicBool::new(true),
262            is_shutdown: AtomicBool::new(false),
263            last_shared_state: Mutex::new(None),
264        })
265    }
266
267    /// Send a message and receive a streaming response.
268    ///
269    /// Accepts any type that converts into [`Content`]: `&str`, `String`,
270    /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
271    /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
272    /// `Vec<ContentPrimitive>` for multimodal input.
273    ///
274    /// Automatically backs off on quota limits (HTTP 429).
275    ///
276    /// # Errors
277    ///
278    /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
279    pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
280        if !self.is_started() {
281            return Err(Error::AgentNotStarted);
282        }
283
284        let content = content.into();
285        let max_retries = self.config.max_quota_retries.unwrap_or(0);
286
287        let handle = 'retry: {
288            for attempt in 0..=max_retries {
289                if attempt > 0 {
290                    self.quota_state.wait_for_quota().await;
291                }
292                match self.runtime.chat(self.id, &content).await {
293                    Ok(h) => break 'retry h,
294                    Err(Error::QuotaExceeded { retry_after }) => {
295                        self.handle_quota_error("chat", attempt, max_retries, retry_after)?;
296                    }
297                    Err(ref e) if e.is_quota_error() => {
298                        self.handle_quota_error(
299                            "chat",
300                            attempt,
301                            max_retries,
302                            DEFAULT_QUOTA_BACKOFF,
303                        )?;
304                    }
305                    Err(e) => return Err(e),
306                }
307            }
308            return Err(Error::QuotaExceeded {
309                retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
310            });
311        };
312
313        if let Ok(mut guard) = self.last_shared_state.lock() {
314            *guard = Some(Arc::clone(&handle.shared_state));
315        } else {
316            tracing::error!("last_shared_state mutex poisoned — streaming metadata may be stale");
317        }
318        Ok(handle)
319    }
320
321    /// Send a message and return the final text response.
322    ///
323    /// This is a convenience wrapper around [`chat`](Self::chat) that drains
324    /// the streaming response into a single `String`. If tools were associated
325    /// with the agent at creation time, the Python runtime handles tool
326    /// execution automatically.
327    ///
328    /// # Errors
329    ///
330    /// Returns [`Error`] if the chat turn fails or stream errors occur.
331    pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
332        let response = self.chat(message.into()).await?;
333        let text = response.text().await.map_err(|e| {
334            let converted = Error::from(e);
335            if matches!(converted, Error::Safety) {
336                converted
337            } else {
338                Error::BackendError {
339                    message: format!("Failed to read response text: {converted}"),
340                }
341            }
342        })?;
343        Ok(text.into_string())
344    }
345
346    /// Return the current conversation ID, if one has been set.
347    ///
348    /// Returns a cloned `String` because the underlying value is behind a
349    /// [`Mutex`] (interior mutability for `&self` access).
350    #[must_use]
351    pub fn conversation_id(&self) -> Option<String> {
352        self.conversation_id
353            .lock()
354            .inspect_err(|e| {
355                tracing::error!(
356                    agent_id = self.id,
357                    error = %e,
358                    "conversation_id mutex poisoned"
359                );
360            })
361            .ok()
362            .and_then(|guard| guard.clone())
363    }
364
365    /// Set the conversation ID (called when the SDK assigns one).
366    ///
367    /// Takes `&self` rather than `&mut self` so the handle can be shared
368    /// across concurrent tasks.
369    pub fn set_conversation_id(&self, id: String) {
370        if let Ok(mut guard) = self.conversation_id.lock() {
371            *guard = Some(id);
372        } else {
373            tracing::error!("Failed to acquire lock on conversation_id");
374        }
375    }
376
377    /// Check whether the agent has been started and is not yet shut down.
378    #[must_use]
379    pub fn is_started(&self) -> bool {
380        self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
381    }
382
383    /// Return the agent's unique identifier.
384    #[must_use]
385    pub const fn id(&self) -> AgentId {
386        self.id
387    }
388
389    /// Return a reference to the agent's configuration.
390    #[must_use]
391    pub const fn config(&self) -> &AgentConfig {
392        &self.config
393    }
394
395    /// Interrupt the active chat prompt execution.
396    ///
397    /// # Errors
398    ///
399    /// Returns a [`Error`] if the cancellation call fails.
400    pub async fn cancel(&self) -> Result<(), Error> {
401        self.runtime.cancel(self.id).await
402    }
403
404    /// Wait for the conversation or active run to stabilize and become idle.
405    ///
406    /// # Errors
407    ///
408    /// Returns a [`Error`] if the wait call fails.
409    pub async fn wait_for_idle(&self) -> Result<(), Error> {
410        self.runtime.wait_for_idle(self.id).await
411    }
412
413    /// Retrieve the conversation's message history.
414    ///
415    /// # Errors
416    ///
417    /// Returns [`Error`] if the query fails.
418    pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
419        self.runtime.history(self.id).await
420    }
421
422    /// Return the number of completed turns in the conversation.
423    ///
424    /// # Errors
425    ///
426    /// Returns [`Error`] if the query fails.
427    pub async fn turn_count(&self) -> Result<u32, Error> {
428        self.runtime.turn_count(self.id).await
429    }
430
431    /// Return cumulative token usage across all turns.
432    ///
433    /// # Errors
434    ///
435    /// Returns [`Error`] if the query fails.
436    pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
437        self.runtime.total_usage(self.id).await
438    }
439
440    /// Return token usage from the most recent turn only.
441    ///
442    /// # Errors
443    ///
444    /// Returns [`Error`] if the query fails.
445    pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
446        self.runtime.last_turn_usage(self.id).await
447    }
448
449    /// Clear the conversation history and reset state.
450    ///
451    /// # Errors
452    ///
453    /// Returns [`Error`] if the operation fails.
454    pub async fn clear_history(&self) -> Result<(), Error> {
455        self.runtime.clear_history(self.id).await
456    }
457
458    /// Return the text of the last model response, if any.
459    ///
460    /// # Errors
461    ///
462    /// Returns [`Error`] if the query fails.
463    pub async fn last_response(&self) -> Result<Option<String>, Error> {
464        self.runtime.last_response(self.id).await
465    }
466
467    /// Return the step indices at which conversation compaction occurred.
468    ///
469    /// # Errors
470    ///
471    /// Returns [`Error`] if the query fails.
472    pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
473        self.runtime.compaction_indices(self.id).await
474    }
475
476    /// Delete the conversation and all associated state.
477    ///
478    /// After calling this method, the agent handle is no longer usable
479    /// for chat operations. This also marks the agent as shut down.
480    ///
481    /// # Errors
482    ///
483    /// Returns [`Error`] if the delete operation fails.
484    pub async fn delete(&self) -> Result<(), Error> {
485        let result = self.runtime.delete(self.id).await;
486        self.is_shutdown.store(true, Ordering::SeqCst);
487        result
488    }
489
490    /// Disconnect from the agent without deleting its state.
491    ///
492    /// The agent's conversation state is preserved but this handle
493    /// can no longer send messages. Marks the agent as shut down.
494    ///
495    /// # Errors
496    ///
497    /// Returns [`Error`] if the disconnect operation fails.
498    pub async fn disconnect(&self) -> Result<(), Error> {
499        let result = self.runtime.disconnect(self.id).await;
500        self.is_shutdown.store(true, Ordering::SeqCst);
501        result
502    }
503
504    /// Check whether the agent is currently idle (not running a turn).
505    ///
506    /// # Errors
507    ///
508    /// Returns [`Error`] if the query fails.
509    pub async fn is_idle(&self) -> Result<bool, Error> {
510        self.runtime.is_idle(self.id).await
511    }
512
513    /// Return the structured output from the last chat response, if any.
514    ///
515    /// Only populated after a [`chat()`](Self::chat) round-trip when the
516    /// agent was configured with a `response_schema` and the model returned
517    /// a valid JSON payload.
518    #[must_use]
519    pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
520        let guard = self
521            .last_shared_state
522            .lock()
523            .inspect_err(|e| {
524                tracing::error!(
525                    agent_id = self.id,
526                    error = %e,
527                    "last_shared_state mutex poisoned in get_last_structured_output"
528                );
529            })
530            .ok()?;
531        let state = guard
532            .as_ref()?
533            .lock()
534            .inspect_err(|e| {
535                tracing::error!(
536                    agent_id = self.id,
537                    error = %e,
538                    "ChatResponseSharedState mutex poisoned in get_last_structured_output"
539                );
540            })
541            .ok()?;
542        state.structured_output.clone()
543    }
544
545    /// Return the usage metadata from the last chat response, if any.
546    #[must_use]
547    pub fn get_last_usage(&self) -> Option<UsageMetadata> {
548        let guard = self
549            .last_shared_state
550            .lock()
551            .inspect_err(|e| {
552                tracing::error!(
553                    agent_id = self.id,
554                    error = %e,
555                    "last_shared_state mutex poisoned in get_last_usage"
556                );
557            })
558            .ok()?;
559        let state = guard
560            .as_ref()?
561            .lock()
562            .inspect_err(|e| {
563                tracing::error!(
564                    agent_id = self.id,
565                    error = %e,
566                    "ChatResponseSharedState mutex poisoned in get_last_usage"
567                );
568            })
569            .ok()?;
570        state.usage.clone()
571    }
572
573    /// Send a message without waiting for a response.
574    ///
575    /// Fire-and-forget: the message is delivered to the agent but no
576    /// streaming response is produced.
577    ///
578    /// # Errors
579    ///
580    /// Returns a [`Error`] if sending fails.
581    pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
582        if !self.is_started() {
583            return Err(Error::AgentNotStarted);
584        }
585
586        let content = content.into();
587
588        let max_retries = self.config.max_quota_retries.unwrap_or(0);
589
590        for attempt in 0..=max_retries {
591            if attempt > 0 {
592                self.quota_state.wait_for_quota().await;
593            }
594            match self.runtime.send(self.id, &content).await {
595                Ok(()) => return Ok(()),
596                Err(Error::QuotaExceeded { retry_after }) => {
597                    self.handle_quota_error("send", attempt, max_retries, retry_after)?;
598                }
599                Err(ref e) if e.is_quota_error() => {
600                    self.handle_quota_error("send", attempt, max_retries, DEFAULT_QUOTA_BACKOFF)?;
601                }
602                Err(e) => return Err(e),
603            }
604        }
605        Err(Error::QuotaExceeded {
606            retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
607        })
608    }
609
610    /// Signal that this agent is idle and ready to receive input.
611    ///
612    /// # Errors
613    ///
614    /// Returns a [`Error`] if the signal call fails.
615    pub async fn signal_idle(&self) -> Result<(), Error> {
616        self.runtime.signal_idle(self.id).await
617    }
618
619    /// Wait for the agent to wake up, returning `true` if woken or
620    /// `false` if the `timeout` elapsed.
621    ///
622    /// # Errors
623    ///
624    /// Returns a [`Error`] if the wait call fails.
625    pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
626        self.runtime.wait_for_wakeup(self.id, timeout).await
627    }
628
629    /// Handle a quota/429 error from a retryable operation.
630    fn handle_quota_error(
631        &self,
632        operation: &str,
633        attempt: u32,
634        max_retries: u32,
635        retry_after: std::time::Duration,
636    ) -> Result<(), Error> {
637        if attempt >= max_retries {
638            return Err(Error::QuotaExceeded { retry_after });
639        }
640        tracing::warn!(
641            agent_id = self.id,
642            attempt = attempt + 1,
643            max = max_retries,
644            retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
645                tracing::warn!("Int conversion failed: {e}");
646                u64::MAX
647            }),
648            "Quota exceeded on {operation} — recording hit and retrying"
649        );
650        self.quota_state.record_quota_hit(retry_after);
651        Ok(())
652    }
653
654    /// Gracefully shut down the agent.
655    ///
656    /// This sends a `ShutdownAgent` command to the Python runtime, which
657    /// calls `__aexit__()` on the SDK agent. The handle remains usable
658    /// for read-only queries (e.g. [`is_started()`](Self::is_started))
659    /// after shutdown.
660    ///
661    /// # Errors
662    ///
663    /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
664    /// flag is always set so the `Drop` impl will not emit a warning.
665    pub async fn shutdown(&self) -> Result<(), Error> {
666        if self.is_shutdown.load(Ordering::SeqCst) {
667            tracing::debug!(agent_id = self.id, "Agent already shut down");
668            return Ok(());
669        }
670
671        tracing::info!(agent_id = self.id, "Shutting down agent");
672        let result = self.runtime.shutdown_agent(self.id).await;
673
674        // Always mark as shut down so Drop doesn't warn, even on failure.
675        self.is_shutdown.store(true, Ordering::SeqCst);
676
677        match result {
678            Ok(()) => {
679                tracing::info!(agent_id = self.id, "Agent shut down successfully");
680            }
681            Err(ref e) => {
682                tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
683            }
684        }
685
686        result
687    }
688
689    /// Spawn a subagent from the given config, sharing this agent's runtime.
690    ///
691    /// If a `ToolRegistry` is provided and `config.tools` is empty, the
692    /// registry's definitions are automatically applied.
693    ///
694    /// # Errors
695    ///
696    /// Returns a [`Error`] if agent creation fails.
697    pub async fn spawn_subagent(
698        &self,
699        mut config: AgentConfig,
700        registry: impl Into<Option<crate::tools::ToolRegistry>>,
701    ) -> Result<Self, Error> {
702        let opt_registry = registry.into();
703        if let Some(disp) = &opt_registry
704            && config.tools.is_empty()
705        {
706            config.tools = disp.definitions();
707        }
708        let arc_registry = opt_registry.map(Arc::new);
709        Self::new(
710            Arc::clone(&self.runtime),
711            config,
712            arc_registry,
713            None,
714            self.policy_handler.clone(),
715        )
716        .await
717    }
718}
719
720impl<R: Runtime> Drop for AgentHandle<R> {
721    fn drop(&mut self) {
722        if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
723            tracing::debug!(
724                agent_id = self.id,
725                "AgentHandle dropped without explicit shutdown() — \
726                 sending best-effort shutdown signal"
727            );
728            self.runtime.try_shutdown_agent(self.id);
729        }
730
731        // Clean up global bridge state entry.
732        if let Ok(mut map) = crate::runtime::bridge_state().write() {
733            map.remove(&self.id);
734        } else {
735            tracing::error!(
736                agent_id = self.id,
737                "BRIDGE_STATE RwLock poisoned during Drop — \
738                 bridge state entry for this agent may leak"
739            );
740        }
741    }
742}