Skip to main content

agy_bridge/agent/
mod.rs

1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7    Arc, Mutex,
8    atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12    config::AgentConfig,
13    content::Content,
14    error::Error,
15    streaming::{ChatResponseHandle, ChatResponseSharedState},
16    types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37    async_fn_in_trait,
38    reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41    /// Create an agent from the given config, returning its ID.
42    async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44    /// Send a chat message to the agent, returning a streaming response handle.
45    ///
46    /// The `content` parameter accepts any [`Content`] variant: plain text,
47    /// images, documents, audio, video, or a multi-part list.
48    async fn chat(&self, agent_id: AgentId, content: &Content)
49    -> Result<ChatResponseHandle, Error>;
50
51    /// Gracefully shut down the agent.
52    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54    /// Interrupt any active prompt/chat run.
55    async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57    /// Wait for the active run or conversational loop to stabilize.
58    async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60    /// Send a message without waiting for completion.
61    async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63    /// Signal that the agent is idle.
64    async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66    /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67    async fn wait_for_wakeup(
68        &self,
69        agent_id: AgentId,
70        timeout: std::time::Duration,
71    ) -> Result<bool, Error>;
72
73    /// Acquire a keep-alive permit during quota waits.
74    async fn acquire_quota_keep_alive_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
75        None
76    }
77
78    /// Wait if we're in a quota backoff period.
79    async fn wait_for_quota(&self);
80
81    /// Record a quota hit with the suggested retry duration.
82    async fn record_quota_hit(&self, retry_after: std::time::Duration);
83
84    /// Access this runtime's per-key quota registry.
85    ///
86    /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
87    /// so different runtimes have fully independent quota tracking.
88    fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
89
90    /// Retrieve the conversation's message history.
91    async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
92
93    /// Return the number of completed turns in the conversation.
94    async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
95
96    /// Return cumulative token usage across all turns.
97    async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
98
99    /// Return token usage from the most recent turn only.
100    async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
101
102    /// Clear the conversation history and reset state.
103    async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
104
105    /// Return the text of the last model response, if any.
106    ///
107    /// Default implementation returns `Ok(None)`.
108    async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
109        Ok(None)
110    }
111
112    /// Return the step indices at which compaction occurred.
113    ///
114    /// Default implementation returns an empty list.
115    async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
116        Ok(Vec::new())
117    }
118
119    /// Delete the conversation and all associated state.
120    ///
121    /// Default implementation is a no-op that returns `Ok(())`.
122    async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
123        Ok(())
124    }
125
126    /// Disconnect from the agent without deleting state.
127    ///
128    /// Default implementation is a no-op that returns `Ok(())`.
129    async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
130        Ok(())
131    }
132
133    /// Check whether the agent is currently idle (not running a turn).
134    ///
135    /// Default implementation returns `Ok(true)`.
136    async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
137        Ok(true)
138    }
139
140    /// Best-effort synchronous shutdown signal, called from [`Drop`].
141    ///
142    /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
143    /// fire-and-forget — it cannot return errors. The default is a no-op;
144    /// implementations backed by a command channel should `try_send` a
145    /// shutdown command here.
146    fn try_shutdown_agent(&self, _agent_id: AgentId) {}
147}
148
149/// Handle to a running agent.
150///
151/// Wraps the agent's lifecycle: creation, chat, and shutdown.
152///
153/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
154/// If the handle is dropped without calling `shutdown()`, a best-effort
155/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
156/// will be cleaned up, but errors are only logged, not returned.
157///
158/// Most methods take `&self` — interior mutability is used where needed
159/// so multiple concurrent operations can share a single handle.
160///
161/// # Mutex choice
162///
163/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
164/// because every lock acquisition is a brief, synchronous operation (pointer
165/// swap or clone) that **never** spans an `.await` point. For these
166/// microsecond critical sections, `std::sync::Mutex` is both simpler and
167/// lower-overhead than the async alternative.
168pub struct AgentHandle<R: Runtime + 'static> {
169    id: AgentId,
170    runtime: Arc<R>,
171    config: AgentConfig,
172    /// Per-API-key quota state. Agents sharing the same effective API key
173    /// share backoff tracking; agents with different keys are independent.
174    quota_state: Arc<crate::quota::QuotaState>,
175    /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
176    /// entry isn't the only strong reference.
177    _registry: Option<Arc<crate::tools::ToolRegistry>>,
178    /// Kept alive to preserve a strong reference to the policy confirmation handler.
179    policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
180    conversation_id: Mutex<Option<String>>,
181    is_started: AtomicBool,
182    is_shutdown: AtomicBool,
183    /// Shared state from the last completed chat response, used to surface
184    /// `get_last_structured_output()` without round-tripping to Python.
185    ///
186    /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
187    /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
188    last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
189}
190
191impl<R: Runtime> AgentHandle<R> {
192    /// Create a new agent from the given runtime and configuration.
193    ///
194    /// This sends a `CreateAgent` command to the Python runtime, waits for
195    /// quota availability, and returns the handle.
196    ///
197    /// # Errors
198    ///
199    /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
200    /// Python error, or quota exceeded).
201    pub async fn new(
202        runtime: Arc<R>,
203        config: AgentConfig,
204        registry: Option<Arc<crate::tools::ToolRegistry>>,
205        hook_runner: Option<Arc<crate::hooks::Hooks>>,
206        policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
207    ) -> Result<Self, Error> {
208        let quota_key = config.effective_api_key().unwrap_or_default();
209        let quota_state = runtime.quota_registry().state_for_key(&quota_key);
210        quota_state.wait_for_quota().await;
211        let id = if hook_runner.is_some() {
212            // Serialize the set→create→clear sequence so concurrent creates
213            // cannot overwrite each other's temporary hook runner.
214            //
215            // NOTE: These must remain process-global because the Python-side
216            // callback (`dispatch_rust_hook`) is itself process-global — it is
217            // registered in `sys.modules["_agy_bridge_globals"]` and has no way
218            // to identify which runtime instance triggered it. A per-runtime
219            // guard would not prevent cross-runtime races on the shared
220            // INITIALIZING_HOOK_RUNNER slot.
221            let _guard = crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await;
222            if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
223                *opt = hook_runner.as_ref().map(Arc::clone);
224            } else {
225                tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — hook may not fire");
226            }
227            let result = runtime.create_agent(config.clone()).await;
228            if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
229                *opt = None;
230            } else {
231                tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — stale hook may persist");
232            }
233            result?
234        } else {
235            runtime.create_agent(config.clone()).await?
236        };
237
238        tracing::info!(agent_id = id, "Agent created successfully");
239
240        // Build and insert per-agent bridge state in a single lock acquisition.
241        let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
242        let bridge_entry = crate::runtime::AgentBridgeState {
243            registry: registry.as_ref().map(Arc::clone),
244            hook_runner: hook_runner.as_ref().map(Arc::clone),
245            policies: policies_set,
246            policy_handler: policy_handler.as_ref().map(Arc::clone),
247            tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
248        };
249        if let Ok(mut map) = crate::runtime::bridge_state().write() {
250            map.insert(id, bridge_entry);
251        } else {
252            tracing::error!(
253                agent_id = id,
254                "Failed to acquire write lock on BRIDGE_STATE"
255            );
256        }
257
258        let conversation_id = Mutex::new(config.conversation_id.clone());
259        Ok(Self {
260            id,
261            runtime,
262            config,
263            quota_state,
264            _registry: registry,
265            policy_handler,
266            conversation_id,
267            is_started: AtomicBool::new(true),
268            is_shutdown: AtomicBool::new(false),
269            last_shared_state: Mutex::new(None),
270        })
271    }
272
273    /// Send a message and receive a streaming response.
274    ///
275    /// Accepts any type that converts into [`Content`]: `&str`, `String`,
276    /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
277    /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
278    /// `Vec<ContentPrimitive>` for multimodal input.
279    ///
280    /// Automatically backs off on quota limits (HTTP 429).
281    ///
282    /// # Errors
283    ///
284    /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
285    pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
286        if !self.is_started() {
287            return Err(Error::AgentNotStarted);
288        }
289
290        let content = content.into();
291
292        let permit = self.runtime.acquire_quota_keep_alive_permit().await;
293
294        let mut handle = 'retry: {
295            for attempt in 0..=Self::MAX_QUOTA_RETRIES {
296                self.quota_state.wait_for_quota().await;
297                match self.runtime.chat(self.id, &content).await {
298                    Ok(h) => break 'retry h,
299                    Err(Error::QuotaExceeded { retry_after }) => {
300                        self.handle_quota_error("chat", attempt, retry_after)?;
301                    }
302                    Err(ref e) if e.is_quota_error() => {
303                        self.handle_quota_error("chat", attempt, DEFAULT_QUOTA_BACKOFF)?;
304                    }
305                    Err(e) => return Err(e),
306                }
307            }
308            return Err(Error::QuotaExceeded {
309                retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
310            });
311        };
312
313        handle.keep_alive_permit = permit;
314        if let Ok(mut guard) = self.last_shared_state.lock() {
315            *guard = Some(Arc::clone(&handle.shared_state));
316        } else {
317            tracing::error!("last_shared_state mutex poisoned — streaming metadata may be stale");
318        }
319        Ok(handle)
320    }
321
322    /// Send a message and return the final text response.
323    ///
324    /// This is a convenience wrapper around [`chat`](Self::chat) that drains
325    /// the streaming response into a single `String`. If tools were associated
326    /// with the agent at creation time, the Python runtime handles tool
327    /// execution automatically.
328    ///
329    /// # Errors
330    ///
331    /// Returns [`Error`] if the chat turn fails or stream errors occur.
332    pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
333        let response = self.chat(message.into()).await?;
334        self.drain_text_with_retry(response).await
335    }
336
337    /// Drain text from a [`ChatResponseHandle`], retrying on 429 quota errors.
338    async fn drain_text_with_retry(&self, response: ChatResponseHandle) -> Result<String, Error> {
339        match response.text().await {
340            Ok(result) => Ok(result.into_string()),
341            Err(e)
342                if e.message.contains("429")
343                    || e.message.contains("503")
344                    || e.message.contains("RESOURCE_EXHAUSTED") =>
345            {
346                Err(Error::QuotaExceeded {
347                    retry_after: DEFAULT_QUOTA_BACKOFF,
348                })
349            }
350            Err(e) => {
351                let converted = Error::from(e);
352                if matches!(converted, Error::Safety) {
353                    Err(converted)
354                } else {
355                    Err(Error::BackendError {
356                        message: format!("Failed to read response text: {converted}"),
357                    })
358                }
359            }
360        }
361    }
362
363    /// Return the current conversation ID, if one has been set.
364    ///
365    /// Returns a cloned `String` because the underlying value is behind a
366    /// [`Mutex`] (interior mutability for `&self` access).
367    #[must_use]
368    pub fn conversation_id(&self) -> Option<String> {
369        self.conversation_id
370            .lock()
371            .inspect_err(|e| {
372                tracing::error!(
373                    agent_id = self.id,
374                    error = %e,
375                    "conversation_id mutex poisoned"
376                );
377            })
378            .ok()
379            .and_then(|guard| guard.clone())
380    }
381
382    /// Set the conversation ID (called when the SDK assigns one).
383    ///
384    /// Takes `&self` rather than `&mut self` so the handle can be shared
385    /// across concurrent tasks.
386    pub fn set_conversation_id(&self, id: String) {
387        if let Ok(mut guard) = self.conversation_id.lock() {
388            *guard = Some(id);
389        } else {
390            tracing::error!("Failed to acquire lock on conversation_id");
391        }
392    }
393
394    /// Check whether the agent has been started and is not yet shut down.
395    #[must_use]
396    pub fn is_started(&self) -> bool {
397        self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
398    }
399
400    /// Return the agent's unique identifier.
401    #[must_use]
402    pub const fn id(&self) -> AgentId {
403        self.id
404    }
405
406    /// Return a reference to the agent's configuration.
407    #[must_use]
408    pub const fn config(&self) -> &AgentConfig {
409        &self.config
410    }
411
412    /// Interrupt the active chat prompt execution.
413    ///
414    /// # Errors
415    ///
416    /// Returns a [`Error`] if the cancellation call fails.
417    pub async fn cancel(&self) -> Result<(), Error> {
418        self.runtime.cancel(self.id).await
419    }
420
421    /// Wait for the conversation or active run to stabilize and become idle.
422    ///
423    /// # Errors
424    ///
425    /// Returns a [`Error`] if the wait call fails.
426    pub async fn wait_for_idle(&self) -> Result<(), Error> {
427        self.runtime.wait_for_idle(self.id).await
428    }
429
430    /// Retrieve the conversation's message history.
431    ///
432    /// # Errors
433    ///
434    /// Returns [`Error`] if the query fails.
435    pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
436        self.runtime.history(self.id).await
437    }
438
439    /// Return the number of completed turns in the conversation.
440    ///
441    /// # Errors
442    ///
443    /// Returns [`Error`] if the query fails.
444    pub async fn turn_count(&self) -> Result<u32, Error> {
445        self.runtime.turn_count(self.id).await
446    }
447
448    /// Return cumulative token usage across all turns.
449    ///
450    /// # Errors
451    ///
452    /// Returns [`Error`] if the query fails.
453    pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
454        self.runtime.total_usage(self.id).await
455    }
456
457    /// Return token usage from the most recent turn only.
458    ///
459    /// # Errors
460    ///
461    /// Returns [`Error`] if the query fails.
462    pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
463        self.runtime.last_turn_usage(self.id).await
464    }
465
466    /// Clear the conversation history and reset state.
467    ///
468    /// # Errors
469    ///
470    /// Returns [`Error`] if the operation fails.
471    pub async fn clear_history(&self) -> Result<(), Error> {
472        self.runtime.clear_history(self.id).await
473    }
474
475    /// Return the text of the last model response, if any.
476    ///
477    /// # Errors
478    ///
479    /// Returns [`Error`] if the query fails.
480    pub async fn last_response(&self) -> Result<Option<String>, Error> {
481        self.runtime.last_response(self.id).await
482    }
483
484    /// Return the step indices at which conversation compaction occurred.
485    ///
486    /// # Errors
487    ///
488    /// Returns [`Error`] if the query fails.
489    pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
490        self.runtime.compaction_indices(self.id).await
491    }
492
493    /// Delete the conversation and all associated state.
494    ///
495    /// After calling this method, the agent handle is no longer usable
496    /// for chat operations. This also marks the agent as shut down.
497    ///
498    /// # Errors
499    ///
500    /// Returns [`Error`] if the delete operation fails.
501    pub async fn delete(&self) -> Result<(), Error> {
502        let result = self.runtime.delete(self.id).await;
503        self.is_shutdown.store(true, Ordering::SeqCst);
504        result
505    }
506
507    /// Disconnect from the agent without deleting its state.
508    ///
509    /// The agent's conversation state is preserved but this handle
510    /// can no longer send messages. Marks the agent as shut down.
511    ///
512    /// # Errors
513    ///
514    /// Returns [`Error`] if the disconnect operation fails.
515    pub async fn disconnect(&self) -> Result<(), Error> {
516        let result = self.runtime.disconnect(self.id).await;
517        self.is_shutdown.store(true, Ordering::SeqCst);
518        result
519    }
520
521    /// Check whether the agent is currently idle (not running a turn).
522    ///
523    /// # Errors
524    ///
525    /// Returns [`Error`] if the query fails.
526    pub async fn is_idle(&self) -> Result<bool, Error> {
527        self.runtime.is_idle(self.id).await
528    }
529
530    /// Return the structured output from the last chat response, if any.
531    ///
532    /// Only populated after a [`chat()`](Self::chat) round-trip when the
533    /// agent was configured with a `response_schema` and the model returned
534    /// a valid JSON payload.
535    #[must_use]
536    pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
537        let guard = self
538            .last_shared_state
539            .lock()
540            .inspect_err(|e| {
541                tracing::error!(
542                    agent_id = self.id,
543                    error = %e,
544                    "last_shared_state mutex poisoned in get_last_structured_output"
545                );
546            })
547            .ok()?;
548        let state = guard
549            .as_ref()?
550            .lock()
551            .inspect_err(|e| {
552                tracing::error!(
553                    agent_id = self.id,
554                    error = %e,
555                    "ChatResponseSharedState mutex poisoned in get_last_structured_output"
556                );
557            })
558            .ok()?;
559        state.structured_output.clone()
560    }
561
562    /// Return the usage metadata from the last chat response, if any.
563    #[must_use]
564    pub fn get_last_usage(&self) -> Option<UsageMetadata> {
565        let guard = self
566            .last_shared_state
567            .lock()
568            .inspect_err(|e| {
569                tracing::error!(
570                    agent_id = self.id,
571                    error = %e,
572                    "last_shared_state mutex poisoned in get_last_usage"
573                );
574            })
575            .ok()?;
576        let state = guard
577            .as_ref()?
578            .lock()
579            .inspect_err(|e| {
580                tracing::error!(
581                    agent_id = self.id,
582                    error = %e,
583                    "ChatResponseSharedState mutex poisoned in get_last_usage"
584                );
585            })
586            .ok()?;
587        state.usage.clone()
588    }
589
590    /// Send a message without waiting for a response.
591    ///
592    /// Fire-and-forget: the message is delivered to the agent but no
593    /// streaming response is produced.
594    ///
595    /// # Errors
596    ///
597    /// Returns a [`Error`] if sending fails.
598    pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
599        if !self.is_started() {
600            return Err(Error::AgentNotStarted);
601        }
602
603        let content = content.into();
604
605        for attempt in 0..=Self::MAX_QUOTA_RETRIES {
606            self.quota_state.wait_for_quota().await;
607            match self.runtime.send(self.id, &content).await {
608                Ok(()) => return Ok(()),
609                Err(Error::QuotaExceeded { retry_after }) => {
610                    self.handle_quota_error("send", attempt, retry_after)?;
611                }
612                Err(ref e) if e.is_quota_error() => {
613                    self.handle_quota_error("send", attempt, DEFAULT_QUOTA_BACKOFF)?;
614                }
615                Err(e) => return Err(e),
616            }
617        }
618        Err(Error::QuotaExceeded {
619            retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
620        })
621    }
622
623    /// Signal that this agent is idle and ready to receive input.
624    ///
625    /// # Errors
626    ///
627    /// Returns a [`Error`] if the signal call fails.
628    pub async fn signal_idle(&self) -> Result<(), Error> {
629        self.runtime.signal_idle(self.id).await
630    }
631
632    /// Wait for the agent to wake up, returning `true` if woken or
633    /// `false` if the `timeout` elapsed.
634    ///
635    /// # Errors
636    ///
637    /// Returns a [`Error`] if the wait call fails.
638    pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
639        self.runtime.wait_for_wakeup(self.id, timeout).await
640    }
641
642    /// Maximum number of quota retry attempts before giving up.
643    const MAX_QUOTA_RETRIES: u32 = 120;
644
645    /// Handle a quota/429 error from a retryable operation.
646    ///
647    /// Returns `Ok(())` if the caller should retry, or `Err` if retries
648    /// are exhausted.
649    fn handle_quota_error(
650        &self,
651        operation: &str,
652        attempt: u32,
653        retry_after: std::time::Duration,
654    ) -> Result<(), Error> {
655        if attempt == Self::MAX_QUOTA_RETRIES {
656            return Err(Error::QuotaExceeded { retry_after });
657        }
658        tracing::warn!(
659            agent_id = self.id,
660            attempt = attempt + 1,
661            max = Self::MAX_QUOTA_RETRIES,
662            retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
663                tracing::warn!("Int conversion failed: {e}");
664                u64::MAX
665            }),
666            "Quota exceeded on {operation} — recording hit and retrying"
667        );
668        self.quota_state.record_quota_hit(retry_after);
669        Ok(())
670    }
671
672    /// Gracefully shut down the agent.
673    ///
674    /// This sends a `ShutdownAgent` command to the Python runtime, which
675    /// calls `__aexit__()` on the SDK agent. The handle remains usable
676    /// for read-only queries (e.g. [`is_started()`](Self::is_started))
677    /// after shutdown.
678    ///
679    /// # Errors
680    ///
681    /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
682    /// flag is always set so the `Drop` impl will not emit a warning.
683    pub async fn shutdown(&self) -> Result<(), Error> {
684        if self.is_shutdown.load(Ordering::SeqCst) {
685            tracing::debug!(agent_id = self.id, "Agent already shut down");
686            return Ok(());
687        }
688
689        tracing::info!(agent_id = self.id, "Shutting down agent");
690        let result = self.runtime.shutdown_agent(self.id).await;
691
692        // Always mark as shut down so Drop doesn't warn, even on failure.
693        self.is_shutdown.store(true, Ordering::SeqCst);
694
695        match result {
696            Ok(()) => {
697                tracing::info!(agent_id = self.id, "Agent shut down successfully");
698            }
699            Err(ref e) => {
700                tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
701            }
702        }
703
704        result
705    }
706
707    /// Spawn a subagent from the given config, sharing this agent's runtime.
708    ///
709    /// If a `ToolRegistry` is provided and `config.tools` is empty, the
710    /// registry's definitions are automatically applied.
711    ///
712    /// # Errors
713    ///
714    /// Returns a [`Error`] if agent creation fails.
715    pub async fn spawn_subagent(
716        &self,
717        mut config: AgentConfig,
718        registry: impl Into<Option<crate::tools::ToolRegistry>>,
719    ) -> Result<Self, Error> {
720        let opt_registry = registry.into();
721        if let Some(disp) = &opt_registry
722            && config.tools.is_empty()
723        {
724            config.tools = disp.definitions();
725        }
726        let arc_registry = opt_registry.map(Arc::new);
727        Self::new(
728            Arc::clone(&self.runtime),
729            config,
730            arc_registry,
731            None,
732            self.policy_handler.clone(),
733        )
734        .await
735    }
736}
737
738impl<R: Runtime> Drop for AgentHandle<R> {
739    fn drop(&mut self) {
740        if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
741            tracing::debug!(
742                agent_id = self.id,
743                "AgentHandle dropped without explicit shutdown() — \
744                 sending best-effort shutdown signal"
745            );
746            self.runtime.try_shutdown_agent(self.id);
747        }
748
749        // Clean up global bridge state entry.
750        if let Ok(mut map) = crate::runtime::bridge_state().write() {
751            map.remove(&self.id);
752        } else {
753            tracing::error!(
754                agent_id = self.id,
755                "BRIDGE_STATE RwLock poisoned during Drop — \
756                 bridge state entry for this agent may leak"
757            );
758        }
759    }
760}