Skip to main content

agy_bridge/agent/
mod.rs

1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7    Arc, Mutex,
8    atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12    config::AgentConfig,
13    content::Content,
14    error::Error,
15    streaming::{ChatResponseHandle, ChatResponseSharedState},
16    types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37    async_fn_in_trait,
38    reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41    /// Create an agent from the given config, returning its ID.
42    async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44    /// Send a chat message to the agent, returning a streaming response handle.
45    ///
46    /// The `content` parameter accepts any [`Content`] variant: plain text,
47    /// images, documents, audio, video, or a multi-part list.
48    async fn chat(&self, agent_id: AgentId, content: &Content)
49    -> Result<ChatResponseHandle, Error>;
50
51    /// Gracefully shut down the agent.
52    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54    /// Interrupt any active prompt/chat run.
55    async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57    /// Wait for the active run or conversational loop to stabilize.
58    async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60    /// Send a message without waiting for completion.
61    async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63    /// Signal that the agent is idle.
64    async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66    /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67    async fn wait_for_wakeup(
68        &self,
69        agent_id: AgentId,
70        timeout: std::time::Duration,
71    ) -> Result<bool, Error>;
72
73    /// Wait if we're in a quota backoff period.
74    async fn wait_for_quota(&self);
75
76    /// Record a quota hit with the suggested retry duration.
77    async fn record_quota_hit(&self, retry_after: std::time::Duration);
78
79    /// Access this runtime's per-key quota registry.
80    ///
81    /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
82    /// so different runtimes have fully independent quota tracking.
83    fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
84
85    /// Retrieve the conversation's message history.
86    async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
87
88    /// Return the number of completed turns in the conversation.
89    async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
90
91    /// Return cumulative token usage across all turns.
92    async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
93
94    /// Return token usage from the most recent turn only.
95    async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
96
97    /// Clear the conversation history and reset state.
98    async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
99
100    /// Return the text of the last model response, if any.
101    ///
102    /// Default implementation returns `Ok(None)`.
103    async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
104        Ok(None)
105    }
106
107    /// Return the step indices at which compaction occurred.
108    ///
109    /// Default implementation returns an empty list.
110    async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
111        Ok(Vec::new())
112    }
113
114    /// Delete the conversation and all associated state.
115    ///
116    /// Default implementation is a no-op that returns `Ok(())`.
117    async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
118        Ok(())
119    }
120
121    /// Disconnect from the agent without deleting state.
122    ///
123    /// Default implementation is a no-op that returns `Ok(())`.
124    async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
125        Ok(())
126    }
127
128    /// Check whether the agent is currently idle (not running a turn).
129    ///
130    /// Default implementation returns `Ok(true)`.
131    async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
132        Ok(true)
133    }
134
135    /// Best-effort synchronous shutdown signal, called from [`Drop`].
136    ///
137    /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
138    /// fire-and-forget — it cannot return errors. The default is a no-op;
139    /// implementations backed by a command channel should `try_send` a
140    /// shutdown command here.
141    fn try_shutdown_agent(&self, _agent_id: AgentId) {}
142}
143
144/// Handle to a running agent.
145///
146/// Wraps the agent's lifecycle: creation, chat, and shutdown.
147///
148/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
149/// If the handle is dropped without calling `shutdown()`, a best-effort
150/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
151/// will be cleaned up, but errors are only logged, not returned.
152///
153/// Most methods take `&self` — interior mutability is used where needed
154/// so multiple concurrent operations can share a single handle.
155///
156/// # Mutex choice
157///
158/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
159/// because every lock acquisition is a brief, synchronous operation (pointer
160/// swap or clone) that **never** spans an `.await` point. For these
161/// microsecond critical sections, `std::sync::Mutex` is both simpler and
162/// lower-overhead than the async alternative.
163pub struct AgentHandle<R: Runtime + 'static> {
164    id: AgentId,
165    runtime: Arc<R>,
166    config: AgentConfig,
167    /// Per-API-key quota state. Agents sharing the same effective API key
168    /// share backoff tracking; agents with different keys are independent.
169    quota_state: Arc<crate::quota::QuotaState>,
170    /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
171    /// entry isn't the only strong reference.
172    _registry: Option<Arc<crate::tools::ToolRegistry>>,
173    /// Kept alive to preserve a strong reference to the policy confirmation handler.
174    policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
175    conversation_id: Arc<Mutex<Option<String>>>,
176    is_started: AtomicBool,
177    is_shutdown: AtomicBool,
178    /// Shared state from the last completed chat response, used to surface
179    /// `get_last_structured_output()` without round-tripping to Python.
180    ///
181    /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
182    /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
183    last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
184}
185
186impl<R: Runtime> AgentHandle<R> {
187    /// Create a new agent from the given runtime and configuration.
188    ///
189    /// This sends a `CreateAgent` command to the Python runtime, waits for
190    /// quota availability, and returns the handle.
191    ///
192    /// # Errors
193    ///
194    /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
195    /// Python error, or quota exceeded).
196    #[expect(
197        clippy::too_many_lines,
198        reason = "Agent initialization is a sequential multi-step setup"
199    )]
200    pub async fn new(
201        runtime: Arc<R>,
202        config: AgentConfig,
203        registry: Option<Arc<crate::tools::ToolRegistry>>,
204        hook_runner: Option<Arc<crate::hooks::Hooks>>,
205        policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
206    ) -> Result<Self, Error> {
207        let quota_key = config.effective_api_key().unwrap_or_default();
208        let quota_state = runtime.quota_registry().state_for_key(&quota_key);
209
210        let mut config = config;
211        let has_session_start = config
212            .hooks
213            .iter()
214            .any(|h| h.point == crate::hooks::HookPoint::OnSessionStart);
215        if !has_session_start {
216            config.hooks.push(crate::hooks::HookEntry {
217                name: "builtin_session_sync".to_string(),
218                point: crate::hooks::HookPoint::OnSessionStart,
219                callback_id: "builtin_session_sync".to_string(),
220            });
221        }
222
223        // We must hold CREATE_AGENT_HOOK_GUARD across both agent creation and
224        // bridge_state insertion. This prevents a race where an asynchronous hook
225        // (like on_session_start) fires in Python after create_agent returns but
226        // before the agent is inserted into bridge_state.
227        let _guard = crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await;
228
229        let effective_hook_runner =
230            hook_runner.unwrap_or_else(|| Arc::new(crate::hooks::Hooks::new()));
231
232        match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
233            Ok(mut opt) => {
234                *opt = Some(Arc::clone(&effective_hook_runner));
235            }
236            Err(e) => {
237                return Err(Error::BackendError {
238                    message: format!(
239                        "INITIALIZING_HOOK_RUNNER mutex poisoned — hooks cannot be installed: {e}"
240                    ),
241                });
242            }
243        }
244
245        let id = runtime.create_agent(config.clone()).await?;
246        tracing::info!(agent_id = id, "Agent created successfully");
247
248        // Build and insert per-agent bridge state in a single lock acquisition.
249        let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
250        let mut initial_conv_id = config.conversation_id.clone();
251        if initial_conv_id.is_none()
252            && let Ok(mut guard) = crate::runtime::PENDING_CONVERSATION_IDS.lock()
253            && let Some(pending_id) = guard.remove(&id)
254        {
255            initial_conv_id = Some(pending_id);
256        }
257        let conversation_id = Arc::new(Mutex::new(initial_conv_id));
258        let bridge_entry = crate::runtime::AgentBridgeState {
259            registry: registry.as_ref().map(Arc::clone),
260            hook_runner: Some(effective_hook_runner),
261            policies: policies_set,
262            policy_handler: policy_handler.as_ref().map(Arc::clone),
263            tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
264            conversation_id: Arc::clone(&conversation_id),
265        };
266        let bridge_insert_failed = match crate::runtime::bridge_state().write() {
267            Ok(mut map) => {
268                map.insert(id, bridge_entry);
269                false
270            }
271            Err(e) => {
272                tracing::error!(
273                    agent_id = id,
274                    error = %e,
275                    "Failed to acquire write lock on BRIDGE_STATE — agent would be unusable"
276                );
277                true
278            }
279        };
280        // Guard is dropped here — safe to .await below.
281        if bridge_insert_failed {
282            // Best-effort shutdown the agent we just created before returning the error.
283            if let Err(shutdown_err) = runtime.shutdown_agent(id).await {
284                tracing::error!(
285                    agent_id = id,
286                    error = ?shutdown_err,
287                    "Failed to shut down agent after BRIDGE_STATE lock failure"
288                );
289            }
290            return Err(Error::BackendError {
291                message: "BRIDGE_STATE RwLock poisoned during agent creation".to_string(),
292            });
293        }
294
295        match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
296            Ok(mut opt) => {
297                *opt = None;
298            }
299            Err(e) => {
300                // The agent is already created at this point. Log at error
301                // level — a stale hook runner may cause the next agent
302                // creation to pick up the wrong hooks, but the current
303                // agent is functional.
304                tracing::error!(
305                    agent_id = id,
306                    error = %e,
307                    "INITIALIZING_HOOK_RUNNER mutex poisoned during cleanup — \
308                     stale hook runner may persist"
309                );
310            }
311        }
312
313        Ok(Self {
314            id,
315            runtime,
316            config,
317            quota_state,
318            _registry: registry,
319            policy_handler,
320            conversation_id,
321            is_started: AtomicBool::new(true),
322            is_shutdown: AtomicBool::new(false),
323            last_shared_state: Mutex::new(None),
324        })
325    }
326
327    /// Send a message and receive a streaming response.
328    ///
329    /// Accepts any type that converts into [`Content`]: `&str`, `String`,
330    /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
331    /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
332    /// `Vec<ContentPrimitive>` for multimodal input.
333    ///
334    /// Automatically backs off on quota limits (HTTP 429).
335    ///
336    /// # Errors
337    ///
338    /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
339    pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
340        if !self.is_started() {
341            return Err(Error::AgentNotStarted);
342        }
343
344        let content = content.into();
345        let max_retries = self.config.max_quota_retries.unwrap_or(0);
346
347        let handle = 'retry: {
348            for attempt in 0..=max_retries {
349                if attempt > 0 {
350                    self.quota_state.wait_for_quota().await;
351                }
352                match self.runtime.chat(self.id, &content).await {
353                    Ok(h) => break 'retry h,
354                    Err(Error::QuotaExceeded { retry_after }) => {
355                        self.handle_quota_error("chat", attempt, max_retries, retry_after)?;
356                    }
357                    Err(ref e) if e.is_quota_error() => {
358                        self.handle_quota_error(
359                            "chat",
360                            attempt,
361                            max_retries,
362                            DEFAULT_QUOTA_BACKOFF,
363                        )?;
364                    }
365                    Err(e) => return Err(e),
366                }
367            }
368            return Err(Error::QuotaExceeded {
369                retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
370            });
371        };
372
373        match self.last_shared_state.lock() {
374            Ok(mut guard) => {
375                *guard = Some(Arc::clone(&handle.shared_state));
376            }
377            Err(e) => {
378                tracing::error!(
379                    agent_id = self.id,
380                    error = %e,
381                    "last_shared_state mutex poisoned — streaming metadata may be stale"
382                );
383            }
384        }
385        Ok(handle)
386    }
387
388    /// Send a message and return the final text response.
389    ///
390    /// This is a convenience wrapper around [`chat`](Self::chat) that drains
391    /// the streaming response into a single `String`. If tools were associated
392    /// with the agent at creation time, the Python runtime handles tool
393    /// execution automatically.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`Error`] if the chat turn fails or stream errors occur.
398    pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
399        let response = self.chat(message.into()).await?;
400        let text = response.text().await.map_err(|e| {
401            let converted = Error::from(e);
402            if matches!(converted, Error::Safety) {
403                converted
404            } else {
405                Error::BackendError {
406                    message: format!("Failed to read response text: {converted}"),
407                }
408            }
409        })?;
410        Ok(text.into_string())
411    }
412
413    /// Return the current conversation ID, if one has been set.
414    ///
415    /// Returns a cloned `String` because the underlying value is behind a
416    /// [`Mutex`] (interior mutability for `&self` access).
417    #[must_use]
418    pub fn conversation_id(&self) -> Option<String> {
419        self.conversation_id
420            .lock()
421            .inspect_err(|e| {
422                tracing::error!(
423                    agent_id = self.id,
424                    error = %e,
425                    "conversation_id mutex poisoned"
426                );
427            })
428            .ok()
429            .and_then(|guard| guard.clone())
430    }
431
432    /// Set the conversation ID (called when the SDK assigns one).
433    ///
434    /// Takes `&self` rather than `&mut self` so the handle can be shared
435    /// across concurrent tasks.
436    pub fn set_conversation_id(&self, id: String) {
437        match self.conversation_id.lock() {
438            Ok(mut guard) => {
439                *guard = Some(id);
440            }
441            Err(e) => {
442                tracing::error!(
443                    agent_id = self.id,
444                    error = %e,
445                    "conversation_id mutex poisoned — ID will not be updated"
446                );
447            }
448        }
449    }
450
451    /// Check whether the agent has been started and is not yet shut down.
452    #[must_use]
453    pub fn is_started(&self) -> bool {
454        self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
455    }
456
457    /// Return the agent's unique identifier.
458    #[must_use]
459    pub const fn id(&self) -> AgentId {
460        self.id
461    }
462
463    /// Return a reference to the agent's configuration.
464    #[must_use]
465    pub const fn config(&self) -> &AgentConfig {
466        &self.config
467    }
468
469    /// Interrupt the active chat prompt execution.
470    ///
471    /// # Errors
472    ///
473    /// Returns a [`Error`] if the cancellation call fails.
474    pub async fn cancel(&self) -> Result<(), Error> {
475        self.runtime.cancel(self.id).await
476    }
477
478    /// Wait for the conversation or active run to stabilize and become idle.
479    ///
480    /// # Errors
481    ///
482    /// Returns a [`Error`] if the wait call fails.
483    pub async fn wait_for_idle(&self) -> Result<(), Error> {
484        self.runtime.wait_for_idle(self.id).await
485    }
486
487    /// Retrieve the conversation's message history.
488    ///
489    /// # Errors
490    ///
491    /// Returns [`Error`] if the query fails.
492    pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
493        self.runtime.history(self.id).await
494    }
495
496    /// Return the number of completed turns in the conversation.
497    ///
498    /// # Errors
499    ///
500    /// Returns [`Error`] if the query fails.
501    pub async fn turn_count(&self) -> Result<u32, Error> {
502        self.runtime.turn_count(self.id).await
503    }
504
505    /// Return cumulative token usage across all turns.
506    ///
507    /// # Errors
508    ///
509    /// Returns [`Error`] if the query fails.
510    pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
511        self.runtime.total_usage(self.id).await
512    }
513
514    /// Return token usage from the most recent turn only.
515    ///
516    /// # Errors
517    ///
518    /// Returns [`Error`] if the query fails.
519    pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
520        self.runtime.last_turn_usage(self.id).await
521    }
522
523    /// Clear the conversation history and reset state.
524    ///
525    /// # Errors
526    ///
527    /// Returns [`Error`] if the operation fails.
528    pub async fn clear_history(&self) -> Result<(), Error> {
529        self.runtime.clear_history(self.id).await
530    }
531
532    /// Return the text of the last model response, if any.
533    ///
534    /// # Errors
535    ///
536    /// Returns [`Error`] if the query fails.
537    pub async fn last_response(&self) -> Result<Option<String>, Error> {
538        self.runtime.last_response(self.id).await
539    }
540
541    /// Return the step indices at which conversation compaction occurred.
542    ///
543    /// # Errors
544    ///
545    /// Returns [`Error`] if the query fails.
546    pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
547        self.runtime.compaction_indices(self.id).await
548    }
549
550    /// Delete the conversation and all associated state.
551    ///
552    /// After calling this method, the agent handle is no longer usable
553    /// for chat operations. This also marks the agent as shut down.
554    ///
555    /// # Errors
556    ///
557    /// Returns [`Error`] if the delete operation fails.
558    pub async fn delete(&self) -> Result<(), Error> {
559        let result = self.runtime.delete(self.id).await;
560        self.is_shutdown.store(true, Ordering::SeqCst);
561        result
562    }
563
564    /// Disconnect from the agent without deleting its state.
565    ///
566    /// The agent's conversation state is preserved but this handle
567    /// can no longer send messages. Marks the agent as shut down.
568    ///
569    /// # Errors
570    ///
571    /// Returns [`Error`] if the disconnect operation fails.
572    pub async fn disconnect(&self) -> Result<(), Error> {
573        let result = self.runtime.disconnect(self.id).await;
574        self.is_shutdown.store(true, Ordering::SeqCst);
575        result
576    }
577
578    /// Check whether the agent is currently idle (not running a turn).
579    ///
580    /// # Errors
581    ///
582    /// Returns [`Error`] if the query fails.
583    pub async fn is_idle(&self) -> Result<bool, Error> {
584        self.runtime.is_idle(self.id).await
585    }
586
587    /// Return the structured output from the last chat response, if any.
588    ///
589    /// Only populated after a [`chat()`](Self::chat) round-trip when the
590    /// agent was configured with a `response_schema` and the model returned
591    /// a valid JSON payload.
592    #[must_use]
593    pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
594        let guard = self
595            .last_shared_state
596            .lock()
597            .inspect_err(|e| {
598                tracing::error!(
599                    agent_id = self.id,
600                    error = %e,
601                    "last_shared_state mutex poisoned in get_last_structured_output"
602                );
603            })
604            .ok()?;
605        let state = guard
606            .as_ref()?
607            .lock()
608            .inspect_err(|e| {
609                tracing::error!(
610                    agent_id = self.id,
611                    error = %e,
612                    "ChatResponseSharedState mutex poisoned in get_last_structured_output"
613                );
614            })
615            .ok()?;
616        state.structured_output.clone()
617    }
618
619    /// Return the structured output from the last chat response deserialized into `T`.
620    ///
621    /// Returns `None` if there was no structured output on the last response.
622    /// Returns `Some(Err(...))` if the structured output could not be deserialized as `T`.
623    pub fn get_last_structured_output_as<T: serde::de::DeserializeOwned>(
624        &self,
625    ) -> Option<Result<T, serde_json::Error>> {
626        self.get_last_structured_output()
627            .map(serde_json::from_value)
628    }
629
630    /// Return the usage metadata from the last chat response, if any.
631    #[must_use]
632    pub fn get_last_usage(&self) -> Option<UsageMetadata> {
633        let guard = self
634            .last_shared_state
635            .lock()
636            .inspect_err(|e| {
637                tracing::error!(
638                    agent_id = self.id,
639                    error = %e,
640                    "last_shared_state mutex poisoned in get_last_usage"
641                );
642            })
643            .ok()?;
644        let state = guard
645            .as_ref()?
646            .lock()
647            .inspect_err(|e| {
648                tracing::error!(
649                    agent_id = self.id,
650                    error = %e,
651                    "ChatResponseSharedState mutex poisoned in get_last_usage"
652                );
653            })
654            .ok()?;
655        state.usage.clone()
656    }
657
658    /// Send a message without waiting for a response.
659    ///
660    /// Fire-and-forget: the message is delivered to the agent but no
661    /// streaming response is produced.
662    ///
663    /// # Errors
664    ///
665    /// Returns a [`Error`] if sending fails.
666    pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
667        if !self.is_started() {
668            return Err(Error::AgentNotStarted);
669        }
670
671        let content = content.into();
672
673        let max_retries = self.config.max_quota_retries.unwrap_or(0);
674
675        for attempt in 0..=max_retries {
676            if attempt > 0 {
677                self.quota_state.wait_for_quota().await;
678            }
679            match self.runtime.send(self.id, &content).await {
680                Ok(()) => return Ok(()),
681                Err(Error::QuotaExceeded { retry_after }) => {
682                    self.handle_quota_error("send", attempt, max_retries, retry_after)?;
683                }
684                Err(ref e) if e.is_quota_error() => {
685                    self.handle_quota_error("send", attempt, max_retries, DEFAULT_QUOTA_BACKOFF)?;
686                }
687                Err(e) => return Err(e),
688            }
689        }
690        Err(Error::QuotaExceeded {
691            retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
692        })
693    }
694
695    /// Signal that this agent is idle and ready to receive input.
696    ///
697    /// # Errors
698    ///
699    /// Returns a [`Error`] if the signal call fails.
700    pub async fn signal_idle(&self) -> Result<(), Error> {
701        self.runtime.signal_idle(self.id).await
702    }
703
704    /// Wait for the agent to wake up, returning `true` if woken or
705    /// `false` if the `timeout` elapsed.
706    ///
707    /// # Errors
708    ///
709    /// Returns a [`Error`] if the wait call fails.
710    pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
711        self.runtime.wait_for_wakeup(self.id, timeout).await
712    }
713
714    /// Handle a quota/429 error from a retryable operation.
715    fn handle_quota_error(
716        &self,
717        operation: &str,
718        attempt: u32,
719        max_retries: u32,
720        retry_after: std::time::Duration,
721    ) -> Result<(), Error> {
722        if attempt >= max_retries {
723            return Err(Error::QuotaExceeded { retry_after });
724        }
725        tracing::warn!(
726            agent_id = self.id,
727            attempt = attempt + 1,
728            max = max_retries,
729            retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
730                tracing::warn!("Int conversion failed: {e}");
731                u64::MAX
732            }),
733            "Quota exceeded on {operation} — recording hit and retrying"
734        );
735        self.quota_state.record_quota_hit(retry_after);
736        Ok(())
737    }
738
739    /// Gracefully shut down the agent.
740    ///
741    /// This sends a `ShutdownAgent` command to the Python runtime, which
742    /// calls `__aexit__()` on the SDK agent. The handle remains usable
743    /// for read-only queries (e.g. [`is_started()`](Self::is_started))
744    /// after shutdown.
745    ///
746    /// # Errors
747    ///
748    /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
749    /// flag is always set so the `Drop` impl will not emit a warning.
750    pub async fn shutdown(&self) -> Result<(), Error> {
751        if self.is_shutdown.load(Ordering::SeqCst) {
752            tracing::debug!(agent_id = self.id, "Agent already shut down");
753            return Ok(());
754        }
755
756        tracing::info!(agent_id = self.id, "Shutting down agent");
757        let result = self.runtime.shutdown_agent(self.id).await;
758
759        // Always mark as shut down so Drop doesn't warn, even on failure.
760        self.is_shutdown.store(true, Ordering::SeqCst);
761
762        // Clean up bridge state AFTER the runtime's shutdown completes.
763        // In the live runtime, `__aexit__` fires hooks (e.g. on_session_end)
764        // that look up bridge state — so this must happen after, not before.
765        match crate::runtime::bridge_state().write() {
766            Ok(mut map) => {
767                map.remove(&self.id);
768            }
769            Err(e) => {
770                tracing::error!(
771                    agent_id = self.id,
772                    error = %e,
773                    "BRIDGE_STATE RwLock poisoned during shutdown cleanup — \
774                     bridge state entry may leak"
775                );
776            }
777        }
778
779        match result {
780            Ok(()) => {
781                tracing::info!(agent_id = self.id, "Agent shut down successfully");
782            }
783            Err(ref e) => {
784                tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
785            }
786        }
787
788        result
789    }
790
791    /// Spawn a subagent from the given config, sharing this agent's runtime.
792    ///
793    /// If a `ToolRegistry` is provided and `config.tools` is empty, the
794    /// registry's definitions are automatically applied.
795    ///
796    /// # Errors
797    ///
798    /// Returns a [`Error`] if agent creation fails.
799    pub async fn spawn_subagent(
800        &self,
801        mut config: AgentConfig,
802        registry: impl Into<Option<crate::tools::ToolRegistry>>,
803    ) -> Result<Self, Error> {
804        let opt_registry = registry.into();
805        if let Some(disp) = &opt_registry
806            && config.tools.is_empty()
807        {
808            config.tools = disp.definitions();
809        }
810        let arc_registry = opt_registry.map(Arc::new);
811        Self::new(
812            Arc::clone(&self.runtime),
813            config,
814            arc_registry,
815            None,
816            self.policy_handler.clone(),
817        )
818        .await
819    }
820}
821
822impl<R: Runtime> Drop for AgentHandle<R> {
823    fn drop(&mut self) {
824        if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
825            tracing::debug!(
826                agent_id = self.id,
827                "AgentHandle dropped without explicit shutdown() — \
828                 sending best-effort shutdown signal"
829            );
830            // try_shutdown_agent fires a command that eventually calls
831            // handle_shutdown_agent, which cleans up bridge state AFTER
832            // __aexit__ completes (so on_session_end hooks can still
833            // find the hook runner). Do NOT clean up bridge state here.
834            self.runtime.try_shutdown_agent(self.id);
835        } else if self.is_shutdown.load(Ordering::SeqCst) {
836            // shutdown() was already called — handle_shutdown_agent
837            // already cleaned up bridge state after __aexit__. Nothing
838            // to do.
839        } else {
840            // Agent was never started (e.g. creation failed). Clean up
841            // any partial bridge state that might have been registered.
842            if let Ok(mut map) = crate::runtime::bridge_state().write() {
843                map.remove(&self.id);
844            } else {
845                tracing::error!(
846                    agent_id = self.id,
847                    "BRIDGE_STATE RwLock poisoned during Drop — \
848                     bridge state entry for this agent may leak"
849                );
850            }
851        }
852    }
853}