Skip to main content

agy_bridge/agent/
mod.rs

1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7    Arc, Mutex,
8    atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12    config::AgentConfig,
13    content::Content,
14    error::Error,
15    streaming::{ChatResponseHandle, ChatResponseSharedState},
16    types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37    async_fn_in_trait,
38    reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41    /// Create an agent from the given config, returning its ID.
42    async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44    /// Send a chat message to the agent, returning a streaming response handle.
45    ///
46    /// The `content` parameter accepts any [`Content`] variant: plain text,
47    /// images, documents, audio, video, or a multi-part list.
48    async fn chat(&self, agent_id: AgentId, content: &Content)
49    -> Result<ChatResponseHandle, Error>;
50
51    /// Gracefully shut down the agent.
52    async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54    /// Interrupt any active prompt/chat run.
55    async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57    /// Wait for the active run or conversational loop to stabilize.
58    async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60    /// Send a message without waiting for completion.
61    async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63    /// Signal that the agent is idle.
64    async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66    /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67    async fn wait_for_wakeup(
68        &self,
69        agent_id: AgentId,
70        timeout: std::time::Duration,
71    ) -> Result<bool, Error>;
72
73    /// Wait if we're in a quota backoff period.
74    async fn wait_for_quota(&self);
75
76    /// Record a quota hit with the suggested retry duration.
77    async fn record_quota_hit(&self, retry_after: std::time::Duration);
78
79    /// Access this runtime's per-key quota registry.
80    ///
81    /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
82    /// so different runtimes have fully independent quota tracking.
83    fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
84
85    /// Retrieve the conversation's message history.
86    async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
87
88    /// Return the number of completed turns in the conversation.
89    async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
90
91    /// Return cumulative token usage across all turns.
92    async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
93
94    /// Return token usage from the most recent turn only.
95    async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
96
97    /// Clear the conversation history and reset state.
98    async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
99
100    /// Return the text of the last model response, if any.
101    ///
102    /// Default implementation returns `Ok(None)`.
103    async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
104        Ok(None)
105    }
106
107    /// Return the step indices at which compaction occurred.
108    ///
109    /// Default implementation returns an empty list.
110    async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
111        Ok(Vec::new())
112    }
113
114    /// Delete the conversation and all associated state.
115    ///
116    /// Default implementation is a no-op that returns `Ok(())`.
117    async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
118        Ok(())
119    }
120
121    /// Disconnect from the agent without deleting state.
122    ///
123    /// Default implementation is a no-op that returns `Ok(())`.
124    async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
125        Ok(())
126    }
127
128    /// Check whether the agent is currently idle (not running a turn).
129    ///
130    /// Default implementation returns `Ok(true)`.
131    async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
132        Ok(true)
133    }
134
135    /// Best-effort synchronous shutdown signal, called from [`Drop`].
136    ///
137    /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
138    /// fire-and-forget — it cannot return errors. The default is a no-op;
139    /// implementations backed by a command channel should `try_send` a
140    /// shutdown command here.
141    fn try_shutdown_agent(&self, _agent_id: AgentId) {}
142}
143
144/// Handle to a running agent.
145///
146/// Wraps the agent's lifecycle: creation, chat, and shutdown.
147///
148/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
149/// If the handle is dropped without calling `shutdown()`, a best-effort
150/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
151/// will be cleaned up, but errors are only logged, not returned.
152///
153/// Most methods take `&self` — interior mutability is used where needed
154/// so multiple concurrent operations can share a single handle.
155///
156/// # Mutex choice
157///
158/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
159/// because every lock acquisition is a brief, synchronous operation (pointer
160/// swap or clone) that **never** spans an `.await` point. For these
161/// microsecond critical sections, `std::sync::Mutex` is both simpler and
162/// lower-overhead than the async alternative.
163pub struct AgentHandle<R: Runtime + 'static> {
164    id: AgentId,
165    runtime: Arc<R>,
166    config: AgentConfig,
167    /// Per-API-key quota state. Agents sharing the same effective API key
168    /// share backoff tracking; agents with different keys are independent.
169    quota_state: Arc<crate::quota::QuotaState>,
170    /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
171    /// entry isn't the only strong reference.
172    _registry: Option<Arc<crate::tools::ToolRegistry>>,
173    /// Kept alive to preserve a strong reference to the policy confirmation handler.
174    policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
175    conversation_id: Mutex<Option<String>>,
176    is_started: AtomicBool,
177    is_shutdown: AtomicBool,
178    /// Shared state from the last completed chat response, used to surface
179    /// `get_last_structured_output()` without round-tripping to Python.
180    ///
181    /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
182    /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
183    last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
184}
185
186impl<R: Runtime> AgentHandle<R> {
187    /// Create a new agent from the given runtime and configuration.
188    ///
189    /// This sends a `CreateAgent` command to the Python runtime, waits for
190    /// quota availability, and returns the handle.
191    ///
192    /// # Errors
193    ///
194    /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
195    /// Python error, or quota exceeded).
196    pub async fn new(
197        runtime: Arc<R>,
198        config: AgentConfig,
199        registry: Option<Arc<crate::tools::ToolRegistry>>,
200        hook_runner: Option<Arc<crate::hooks::Hooks>>,
201        policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
202    ) -> Result<Self, Error> {
203        let quota_key = config.effective_api_key().unwrap_or_default();
204        let quota_state = runtime.quota_registry().state_for_key(&quota_key);
205
206        // We must hold CREATE_AGENT_HOOK_GUARD across both agent creation and
207        // bridge_state insertion. This prevents a race where an asynchronous hook
208        // (like on_session_start) fires in Python after create_agent returns but
209        // before the agent is inserted into bridge_state.
210        let _guard = if hook_runner.is_some() {
211            Some(crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await)
212        } else {
213            None
214        };
215
216        if hook_runner.is_some() {
217            match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
218                Ok(mut opt) => {
219                    *opt = hook_runner.as_ref().map(Arc::clone);
220                }
221                Err(e) => {
222                    return Err(Error::BackendError {
223                        message: format!(
224                            "INITIALIZING_HOOK_RUNNER mutex poisoned — hooks cannot be installed: {e}"
225                        ),
226                    });
227                }
228            }
229        }
230
231        let id = runtime.create_agent(config.clone()).await?;
232        tracing::info!(agent_id = id, "Agent created successfully");
233
234        // Build and insert per-agent bridge state in a single lock acquisition.
235        let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
236        let bridge_entry = crate::runtime::AgentBridgeState {
237            registry: registry.as_ref().map(Arc::clone),
238            hook_runner: hook_runner.as_ref().map(Arc::clone),
239            policies: policies_set,
240            policy_handler: policy_handler.as_ref().map(Arc::clone),
241            tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
242        };
243        let bridge_insert_failed = match crate::runtime::bridge_state().write() {
244            Ok(mut map) => {
245                map.insert(id, bridge_entry);
246                false
247            }
248            Err(e) => {
249                tracing::error!(
250                    agent_id = id,
251                    error = %e,
252                    "Failed to acquire write lock on BRIDGE_STATE — agent would be unusable"
253                );
254                true
255            }
256        };
257        // Guard is dropped here — safe to .await below.
258        if bridge_insert_failed {
259            // Best-effort shutdown the agent we just created before returning the error.
260            if let Err(shutdown_err) = runtime.shutdown_agent(id).await {
261                tracing::error!(
262                    agent_id = id,
263                    error = ?shutdown_err,
264                    "Failed to shut down agent after BRIDGE_STATE lock failure"
265                );
266            }
267            return Err(Error::BackendError {
268                message: "BRIDGE_STATE RwLock poisoned during agent creation".to_string(),
269            });
270        }
271
272        if hook_runner.is_some() {
273            match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
274                Ok(mut opt) => {
275                    *opt = None;
276                }
277                Err(e) => {
278                    // The agent is already created at this point. Log at error
279                    // level — a stale hook runner may cause the next agent
280                    // creation to pick up the wrong hooks, but the current
281                    // agent is functional.
282                    tracing::error!(
283                        agent_id = id,
284                        error = %e,
285                        "INITIALIZING_HOOK_RUNNER mutex poisoned during cleanup — \
286                         stale hook runner may persist"
287                    );
288                }
289            }
290        }
291
292        let conversation_id = Mutex::new(config.conversation_id.clone());
293        Ok(Self {
294            id,
295            runtime,
296            config,
297            quota_state,
298            _registry: registry,
299            policy_handler,
300            conversation_id,
301            is_started: AtomicBool::new(true),
302            is_shutdown: AtomicBool::new(false),
303            last_shared_state: Mutex::new(None),
304        })
305    }
306
307    /// Send a message and receive a streaming response.
308    ///
309    /// Accepts any type that converts into [`Content`]: `&str`, `String`,
310    /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
311    /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
312    /// `Vec<ContentPrimitive>` for multimodal input.
313    ///
314    /// Automatically backs off on quota limits (HTTP 429).
315    ///
316    /// # Errors
317    ///
318    /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
319    pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
320        if !self.is_started() {
321            return Err(Error::AgentNotStarted);
322        }
323
324        let content = content.into();
325        let max_retries = self.config.max_quota_retries.unwrap_or(0);
326
327        let handle = 'retry: {
328            for attempt in 0..=max_retries {
329                if attempt > 0 {
330                    self.quota_state.wait_for_quota().await;
331                }
332                match self.runtime.chat(self.id, &content).await {
333                    Ok(h) => break 'retry h,
334                    Err(Error::QuotaExceeded { retry_after }) => {
335                        self.handle_quota_error("chat", attempt, max_retries, retry_after)?;
336                    }
337                    Err(ref e) if e.is_quota_error() => {
338                        self.handle_quota_error(
339                            "chat",
340                            attempt,
341                            max_retries,
342                            DEFAULT_QUOTA_BACKOFF,
343                        )?;
344                    }
345                    Err(e) => return Err(e),
346                }
347            }
348            return Err(Error::QuotaExceeded {
349                retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
350            });
351        };
352
353        match self.last_shared_state.lock() {
354            Ok(mut guard) => {
355                *guard = Some(Arc::clone(&handle.shared_state));
356            }
357            Err(e) => {
358                tracing::error!(
359                    agent_id = self.id,
360                    error = %e,
361                    "last_shared_state mutex poisoned — streaming metadata may be stale"
362                );
363            }
364        }
365        Ok(handle)
366    }
367
368    /// Send a message and return the final text response.
369    ///
370    /// This is a convenience wrapper around [`chat`](Self::chat) that drains
371    /// the streaming response into a single `String`. If tools were associated
372    /// with the agent at creation time, the Python runtime handles tool
373    /// execution automatically.
374    ///
375    /// # Errors
376    ///
377    /// Returns [`Error`] if the chat turn fails or stream errors occur.
378    pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
379        let response = self.chat(message.into()).await?;
380        let text = response.text().await.map_err(|e| {
381            let converted = Error::from(e);
382            if matches!(converted, Error::Safety) {
383                converted
384            } else {
385                Error::BackendError {
386                    message: format!("Failed to read response text: {converted}"),
387                }
388            }
389        })?;
390        Ok(text.into_string())
391    }
392
393    /// Return the current conversation ID, if one has been set.
394    ///
395    /// Returns a cloned `String` because the underlying value is behind a
396    /// [`Mutex`] (interior mutability for `&self` access).
397    #[must_use]
398    pub fn conversation_id(&self) -> Option<String> {
399        self.conversation_id
400            .lock()
401            .inspect_err(|e| {
402                tracing::error!(
403                    agent_id = self.id,
404                    error = %e,
405                    "conversation_id mutex poisoned"
406                );
407            })
408            .ok()
409            .and_then(|guard| guard.clone())
410    }
411
412    /// Set the conversation ID (called when the SDK assigns one).
413    ///
414    /// Takes `&self` rather than `&mut self` so the handle can be shared
415    /// across concurrent tasks.
416    pub fn set_conversation_id(&self, id: String) {
417        match self.conversation_id.lock() {
418            Ok(mut guard) => {
419                *guard = Some(id);
420            }
421            Err(e) => {
422                tracing::error!(
423                    agent_id = self.id,
424                    error = %e,
425                    "conversation_id mutex poisoned — ID will not be updated"
426                );
427            }
428        }
429    }
430
431    /// Check whether the agent has been started and is not yet shut down.
432    #[must_use]
433    pub fn is_started(&self) -> bool {
434        self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
435    }
436
437    /// Return the agent's unique identifier.
438    #[must_use]
439    pub const fn id(&self) -> AgentId {
440        self.id
441    }
442
443    /// Return a reference to the agent's configuration.
444    #[must_use]
445    pub const fn config(&self) -> &AgentConfig {
446        &self.config
447    }
448
449    /// Interrupt the active chat prompt execution.
450    ///
451    /// # Errors
452    ///
453    /// Returns a [`Error`] if the cancellation call fails.
454    pub async fn cancel(&self) -> Result<(), Error> {
455        self.runtime.cancel(self.id).await
456    }
457
458    /// Wait for the conversation or active run to stabilize and become idle.
459    ///
460    /// # Errors
461    ///
462    /// Returns a [`Error`] if the wait call fails.
463    pub async fn wait_for_idle(&self) -> Result<(), Error> {
464        self.runtime.wait_for_idle(self.id).await
465    }
466
467    /// Retrieve the conversation's message history.
468    ///
469    /// # Errors
470    ///
471    /// Returns [`Error`] if the query fails.
472    pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
473        self.runtime.history(self.id).await
474    }
475
476    /// Return the number of completed turns in the conversation.
477    ///
478    /// # Errors
479    ///
480    /// Returns [`Error`] if the query fails.
481    pub async fn turn_count(&self) -> Result<u32, Error> {
482        self.runtime.turn_count(self.id).await
483    }
484
485    /// Return cumulative token usage across all turns.
486    ///
487    /// # Errors
488    ///
489    /// Returns [`Error`] if the query fails.
490    pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
491        self.runtime.total_usage(self.id).await
492    }
493
494    /// Return token usage from the most recent turn only.
495    ///
496    /// # Errors
497    ///
498    /// Returns [`Error`] if the query fails.
499    pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
500        self.runtime.last_turn_usage(self.id).await
501    }
502
503    /// Clear the conversation history and reset state.
504    ///
505    /// # Errors
506    ///
507    /// Returns [`Error`] if the operation fails.
508    pub async fn clear_history(&self) -> Result<(), Error> {
509        self.runtime.clear_history(self.id).await
510    }
511
512    /// Return the text of the last model response, if any.
513    ///
514    /// # Errors
515    ///
516    /// Returns [`Error`] if the query fails.
517    pub async fn last_response(&self) -> Result<Option<String>, Error> {
518        self.runtime.last_response(self.id).await
519    }
520
521    /// Return the step indices at which conversation compaction occurred.
522    ///
523    /// # Errors
524    ///
525    /// Returns [`Error`] if the query fails.
526    pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
527        self.runtime.compaction_indices(self.id).await
528    }
529
530    /// Delete the conversation and all associated state.
531    ///
532    /// After calling this method, the agent handle is no longer usable
533    /// for chat operations. This also marks the agent as shut down.
534    ///
535    /// # Errors
536    ///
537    /// Returns [`Error`] if the delete operation fails.
538    pub async fn delete(&self) -> Result<(), Error> {
539        let result = self.runtime.delete(self.id).await;
540        self.is_shutdown.store(true, Ordering::SeqCst);
541        result
542    }
543
544    /// Disconnect from the agent without deleting its state.
545    ///
546    /// The agent's conversation state is preserved but this handle
547    /// can no longer send messages. Marks the agent as shut down.
548    ///
549    /// # Errors
550    ///
551    /// Returns [`Error`] if the disconnect operation fails.
552    pub async fn disconnect(&self) -> Result<(), Error> {
553        let result = self.runtime.disconnect(self.id).await;
554        self.is_shutdown.store(true, Ordering::SeqCst);
555        result
556    }
557
558    /// Check whether the agent is currently idle (not running a turn).
559    ///
560    /// # Errors
561    ///
562    /// Returns [`Error`] if the query fails.
563    pub async fn is_idle(&self) -> Result<bool, Error> {
564        self.runtime.is_idle(self.id).await
565    }
566
567    /// Return the structured output from the last chat response, if any.
568    ///
569    /// Only populated after a [`chat()`](Self::chat) round-trip when the
570    /// agent was configured with a `response_schema` and the model returned
571    /// a valid JSON payload.
572    #[must_use]
573    pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
574        let guard = self
575            .last_shared_state
576            .lock()
577            .inspect_err(|e| {
578                tracing::error!(
579                    agent_id = self.id,
580                    error = %e,
581                    "last_shared_state mutex poisoned in get_last_structured_output"
582                );
583            })
584            .ok()?;
585        let state = guard
586            .as_ref()?
587            .lock()
588            .inspect_err(|e| {
589                tracing::error!(
590                    agent_id = self.id,
591                    error = %e,
592                    "ChatResponseSharedState mutex poisoned in get_last_structured_output"
593                );
594            })
595            .ok()?;
596        state.structured_output.clone()
597    }
598
599    /// Return the structured output from the last chat response deserialized into `T`.
600    ///
601    /// Returns `None` if there was no structured output on the last response.
602    /// Returns `Some(Err(...))` if the structured output could not be deserialized as `T`.
603    pub fn get_last_structured_output_as<T: serde::de::DeserializeOwned>(
604        &self,
605    ) -> Option<Result<T, serde_json::Error>> {
606        self.get_last_structured_output()
607            .map(serde_json::from_value)
608    }
609
610    /// Return the usage metadata from the last chat response, if any.
611    #[must_use]
612    pub fn get_last_usage(&self) -> Option<UsageMetadata> {
613        let guard = self
614            .last_shared_state
615            .lock()
616            .inspect_err(|e| {
617                tracing::error!(
618                    agent_id = self.id,
619                    error = %e,
620                    "last_shared_state mutex poisoned in get_last_usage"
621                );
622            })
623            .ok()?;
624        let state = guard
625            .as_ref()?
626            .lock()
627            .inspect_err(|e| {
628                tracing::error!(
629                    agent_id = self.id,
630                    error = %e,
631                    "ChatResponseSharedState mutex poisoned in get_last_usage"
632                );
633            })
634            .ok()?;
635        state.usage.clone()
636    }
637
638    /// Send a message without waiting for a response.
639    ///
640    /// Fire-and-forget: the message is delivered to the agent but no
641    /// streaming response is produced.
642    ///
643    /// # Errors
644    ///
645    /// Returns a [`Error`] if sending fails.
646    pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
647        if !self.is_started() {
648            return Err(Error::AgentNotStarted);
649        }
650
651        let content = content.into();
652
653        let max_retries = self.config.max_quota_retries.unwrap_or(0);
654
655        for attempt in 0..=max_retries {
656            if attempt > 0 {
657                self.quota_state.wait_for_quota().await;
658            }
659            match self.runtime.send(self.id, &content).await {
660                Ok(()) => return Ok(()),
661                Err(Error::QuotaExceeded { retry_after }) => {
662                    self.handle_quota_error("send", attempt, max_retries, retry_after)?;
663                }
664                Err(ref e) if e.is_quota_error() => {
665                    self.handle_quota_error("send", attempt, max_retries, DEFAULT_QUOTA_BACKOFF)?;
666                }
667                Err(e) => return Err(e),
668            }
669        }
670        Err(Error::QuotaExceeded {
671            retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
672        })
673    }
674
675    /// Signal that this agent is idle and ready to receive input.
676    ///
677    /// # Errors
678    ///
679    /// Returns a [`Error`] if the signal call fails.
680    pub async fn signal_idle(&self) -> Result<(), Error> {
681        self.runtime.signal_idle(self.id).await
682    }
683
684    /// Wait for the agent to wake up, returning `true` if woken or
685    /// `false` if the `timeout` elapsed.
686    ///
687    /// # Errors
688    ///
689    /// Returns a [`Error`] if the wait call fails.
690    pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
691        self.runtime.wait_for_wakeup(self.id, timeout).await
692    }
693
694    /// Handle a quota/429 error from a retryable operation.
695    fn handle_quota_error(
696        &self,
697        operation: &str,
698        attempt: u32,
699        max_retries: u32,
700        retry_after: std::time::Duration,
701    ) -> Result<(), Error> {
702        if attempt >= max_retries {
703            return Err(Error::QuotaExceeded { retry_after });
704        }
705        tracing::warn!(
706            agent_id = self.id,
707            attempt = attempt + 1,
708            max = max_retries,
709            retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
710                tracing::warn!("Int conversion failed: {e}");
711                u64::MAX
712            }),
713            "Quota exceeded on {operation} — recording hit and retrying"
714        );
715        self.quota_state.record_quota_hit(retry_after);
716        Ok(())
717    }
718
719    /// Gracefully shut down the agent.
720    ///
721    /// This sends a `ShutdownAgent` command to the Python runtime, which
722    /// calls `__aexit__()` on the SDK agent. The handle remains usable
723    /// for read-only queries (e.g. [`is_started()`](Self::is_started))
724    /// after shutdown.
725    ///
726    /// # Errors
727    ///
728    /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
729    /// flag is always set so the `Drop` impl will not emit a warning.
730    pub async fn shutdown(&self) -> Result<(), Error> {
731        if self.is_shutdown.load(Ordering::SeqCst) {
732            tracing::debug!(agent_id = self.id, "Agent already shut down");
733            return Ok(());
734        }
735
736        tracing::info!(agent_id = self.id, "Shutting down agent");
737        let result = self.runtime.shutdown_agent(self.id).await;
738
739        // Always mark as shut down so Drop doesn't warn, even on failure.
740        self.is_shutdown.store(true, Ordering::SeqCst);
741
742        // Clean up bridge state AFTER the runtime's shutdown completes.
743        // In the live runtime, `__aexit__` fires hooks (e.g. on_session_end)
744        // that look up bridge state — so this must happen after, not before.
745        match crate::runtime::bridge_state().write() {
746            Ok(mut map) => {
747                map.remove(&self.id);
748            }
749            Err(e) => {
750                tracing::error!(
751                    agent_id = self.id,
752                    error = %e,
753                    "BRIDGE_STATE RwLock poisoned during shutdown cleanup — \
754                     bridge state entry may leak"
755                );
756            }
757        }
758
759        match result {
760            Ok(()) => {
761                tracing::info!(agent_id = self.id, "Agent shut down successfully");
762            }
763            Err(ref e) => {
764                tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
765            }
766        }
767
768        result
769    }
770
771    /// Spawn a subagent from the given config, sharing this agent's runtime.
772    ///
773    /// If a `ToolRegistry` is provided and `config.tools` is empty, the
774    /// registry's definitions are automatically applied.
775    ///
776    /// # Errors
777    ///
778    /// Returns a [`Error`] if agent creation fails.
779    pub async fn spawn_subagent(
780        &self,
781        mut config: AgentConfig,
782        registry: impl Into<Option<crate::tools::ToolRegistry>>,
783    ) -> Result<Self, Error> {
784        let opt_registry = registry.into();
785        if let Some(disp) = &opt_registry
786            && config.tools.is_empty()
787        {
788            config.tools = disp.definitions();
789        }
790        let arc_registry = opt_registry.map(Arc::new);
791        Self::new(
792            Arc::clone(&self.runtime),
793            config,
794            arc_registry,
795            None,
796            self.policy_handler.clone(),
797        )
798        .await
799    }
800}
801
802impl<R: Runtime> Drop for AgentHandle<R> {
803    fn drop(&mut self) {
804        if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
805            tracing::debug!(
806                agent_id = self.id,
807                "AgentHandle dropped without explicit shutdown() — \
808                 sending best-effort shutdown signal"
809            );
810            // try_shutdown_agent fires a command that eventually calls
811            // handle_shutdown_agent, which cleans up bridge state AFTER
812            // __aexit__ completes (so on_session_end hooks can still
813            // find the hook runner). Do NOT clean up bridge state here.
814            self.runtime.try_shutdown_agent(self.id);
815        } else if self.is_shutdown.load(Ordering::SeqCst) {
816            // shutdown() was already called — handle_shutdown_agent
817            // already cleaned up bridge state after __aexit__. Nothing
818            // to do.
819        } else {
820            // Agent was never started (e.g. creation failed). Clean up
821            // any partial bridge state that might have been registered.
822            if let Ok(mut map) = crate::runtime::bridge_state().write() {
823                map.remove(&self.id);
824            } else {
825                tracing::error!(
826                    agent_id = self.id,
827                    "BRIDGE_STATE RwLock poisoned during Drop — \
828                     bridge state entry for this agent may leak"
829                );
830            }
831        }
832    }
833}