agy_bridge/agent/mod.rs
1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7 Arc, Mutex,
8 atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12 config::AgentConfig,
13 content::Content,
14 error::Error,
15 streaming::{ChatResponseHandle, ChatResponseSharedState},
16 types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37 async_fn_in_trait,
38 reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41 /// Create an agent from the given config, returning its ID.
42 async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44 /// Send a chat message to the agent, returning a streaming response handle.
45 ///
46 /// The `content` parameter accepts any [`Content`] variant: plain text,
47 /// images, documents, audio, video, or a multi-part list.
48 async fn chat(&self, agent_id: AgentId, content: &Content)
49 -> Result<ChatResponseHandle, Error>;
50
51 /// Gracefully shut down the agent.
52 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54 /// Interrupt any active prompt/chat run.
55 async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57 /// Wait for the active run or conversational loop to stabilize.
58 async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60 /// Send a message without waiting for completion.
61 async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63 /// Signal that the agent is idle.
64 async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66 /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67 async fn wait_for_wakeup(
68 &self,
69 agent_id: AgentId,
70 timeout: std::time::Duration,
71 ) -> Result<bool, Error>;
72
73 /// Wait if we're in a quota backoff period.
74 async fn wait_for_quota(&self);
75
76 /// Record a quota hit with the suggested retry duration.
77 async fn record_quota_hit(&self, retry_after: std::time::Duration);
78
79 /// Access this runtime's per-key quota registry.
80 ///
81 /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
82 /// so different runtimes have fully independent quota tracking.
83 fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
84
85 /// Retrieve the conversation's message history.
86 async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
87
88 /// Return the number of completed turns in the conversation.
89 async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
90
91 /// Return cumulative token usage across all turns.
92 async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
93
94 /// Return token usage from the most recent turn only.
95 async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
96
97 /// Clear the conversation history and reset state.
98 async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
99
100 /// Return the text of the last model response, if any.
101 ///
102 /// Default implementation returns `Ok(None)`.
103 async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
104 Ok(None)
105 }
106
107 /// Return the step indices at which compaction occurred.
108 ///
109 /// Default implementation returns an empty list.
110 async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
111 Ok(Vec::new())
112 }
113
114 /// Delete the conversation and all associated state.
115 ///
116 /// Default implementation is a no-op that returns `Ok(())`.
117 async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
118 Ok(())
119 }
120
121 /// Disconnect from the agent without deleting state.
122 ///
123 /// Default implementation is a no-op that returns `Ok(())`.
124 async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
125 Ok(())
126 }
127
128 /// Check whether the agent is currently idle (not running a turn).
129 ///
130 /// Default implementation returns `Ok(true)`.
131 async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
132 Ok(true)
133 }
134
135 /// Best-effort synchronous shutdown signal, called from [`Drop`].
136 ///
137 /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
138 /// fire-and-forget — it cannot return errors. The default is a no-op;
139 /// implementations backed by a command channel should `try_send` a
140 /// shutdown command here.
141 fn try_shutdown_agent(&self, _agent_id: AgentId) {}
142}
143
144/// Handle to a running agent.
145///
146/// Wraps the agent's lifecycle: creation, chat, and shutdown.
147///
148/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
149/// If the handle is dropped without calling `shutdown()`, a best-effort
150/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
151/// will be cleaned up, but errors are only logged, not returned.
152///
153/// Most methods take `&self` — interior mutability is used where needed
154/// so multiple concurrent operations can share a single handle.
155///
156/// # Mutex choice
157///
158/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
159/// because every lock acquisition is a brief, synchronous operation (pointer
160/// swap or clone) that **never** spans an `.await` point. For these
161/// microsecond critical sections, `std::sync::Mutex` is both simpler and
162/// lower-overhead than the async alternative.
163pub struct AgentHandle<R: Runtime + 'static> {
164 id: AgentId,
165 runtime: Arc<R>,
166 config: AgentConfig,
167 /// Per-API-key quota state. Agents sharing the same effective API key
168 /// share backoff tracking; agents with different keys are independent.
169 quota_state: Arc<crate::quota::QuotaState>,
170 /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
171 /// entry isn't the only strong reference.
172 _registry: Option<Arc<crate::tools::ToolRegistry>>,
173 /// Kept alive to preserve a strong reference to the policy confirmation handler.
174 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
175 conversation_id: Mutex<Option<String>>,
176 is_started: AtomicBool,
177 is_shutdown: AtomicBool,
178 /// Shared state from the last completed chat response, used to surface
179 /// `get_last_structured_output()` without round-tripping to Python.
180 ///
181 /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
182 /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
183 last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
184}
185
186impl<R: Runtime> AgentHandle<R> {
187 /// Create a new agent from the given runtime and configuration.
188 ///
189 /// This sends a `CreateAgent` command to the Python runtime, waits for
190 /// quota availability, and returns the handle.
191 ///
192 /// # Errors
193 ///
194 /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
195 /// Python error, or quota exceeded).
196 pub async fn new(
197 runtime: Arc<R>,
198 config: AgentConfig,
199 registry: Option<Arc<crate::tools::ToolRegistry>>,
200 hook_runner: Option<Arc<crate::hooks::Hooks>>,
201 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
202 ) -> Result<Self, Error> {
203 let quota_key = config.effective_api_key().unwrap_or_default();
204 let quota_state = runtime.quota_registry().state_for_key("a_key);
205
206 // We must hold CREATE_AGENT_HOOK_GUARD across both agent creation and
207 // bridge_state insertion. This prevents a race where an asynchronous hook
208 // (like on_session_start) fires in Python after create_agent returns but
209 // before the agent is inserted into bridge_state.
210 let _guard = if hook_runner.is_some() {
211 Some(crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await)
212 } else {
213 None
214 };
215
216 if hook_runner.is_some() {
217 match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
218 Ok(mut opt) => {
219 *opt = hook_runner.as_ref().map(Arc::clone);
220 }
221 Err(e) => {
222 return Err(Error::BackendError {
223 message: format!(
224 "INITIALIZING_HOOK_RUNNER mutex poisoned — hooks cannot be installed: {e}"
225 ),
226 });
227 }
228 }
229 }
230
231 let id = runtime.create_agent(config.clone()).await?;
232 tracing::info!(agent_id = id, "Agent created successfully");
233
234 // Build and insert per-agent bridge state in a single lock acquisition.
235 let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
236 let bridge_entry = crate::runtime::AgentBridgeState {
237 registry: registry.as_ref().map(Arc::clone),
238 hook_runner: hook_runner.as_ref().map(Arc::clone),
239 policies: policies_set,
240 policy_handler: policy_handler.as_ref().map(Arc::clone),
241 tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
242 };
243 let bridge_insert_failed = match crate::runtime::bridge_state().write() {
244 Ok(mut map) => {
245 map.insert(id, bridge_entry);
246 false
247 }
248 Err(e) => {
249 tracing::error!(
250 agent_id = id,
251 error = %e,
252 "Failed to acquire write lock on BRIDGE_STATE — agent would be unusable"
253 );
254 true
255 }
256 };
257 // Guard is dropped here — safe to .await below.
258 if bridge_insert_failed {
259 // Best-effort shutdown the agent we just created before returning the error.
260 if let Err(shutdown_err) = runtime.shutdown_agent(id).await {
261 tracing::error!(
262 agent_id = id,
263 error = ?shutdown_err,
264 "Failed to shut down agent after BRIDGE_STATE lock failure"
265 );
266 }
267 return Err(Error::BackendError {
268 message: "BRIDGE_STATE RwLock poisoned during agent creation".to_string(),
269 });
270 }
271
272 if hook_runner.is_some() {
273 match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
274 Ok(mut opt) => {
275 *opt = None;
276 }
277 Err(e) => {
278 // The agent is already created at this point. Log at error
279 // level — a stale hook runner may cause the next agent
280 // creation to pick up the wrong hooks, but the current
281 // agent is functional.
282 tracing::error!(
283 agent_id = id,
284 error = %e,
285 "INITIALIZING_HOOK_RUNNER mutex poisoned during cleanup — \
286 stale hook runner may persist"
287 );
288 }
289 }
290 }
291
292 let conversation_id = Mutex::new(config.conversation_id.clone());
293 Ok(Self {
294 id,
295 runtime,
296 config,
297 quota_state,
298 _registry: registry,
299 policy_handler,
300 conversation_id,
301 is_started: AtomicBool::new(true),
302 is_shutdown: AtomicBool::new(false),
303 last_shared_state: Mutex::new(None),
304 })
305 }
306
307 /// Send a message and receive a streaming response.
308 ///
309 /// Accepts any type that converts into [`Content`]: `&str`, `String`,
310 /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
311 /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
312 /// `Vec<ContentPrimitive>` for multimodal input.
313 ///
314 /// Automatically backs off on quota limits (HTTP 429).
315 ///
316 /// # Errors
317 ///
318 /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
319 pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
320 if !self.is_started() {
321 return Err(Error::AgentNotStarted);
322 }
323
324 let content = content.into();
325 let max_retries = self.config.max_quota_retries.unwrap_or(0);
326
327 let handle = 'retry: {
328 for attempt in 0..=max_retries {
329 if attempt > 0 {
330 self.quota_state.wait_for_quota().await;
331 }
332 match self.runtime.chat(self.id, &content).await {
333 Ok(h) => break 'retry h,
334 Err(Error::QuotaExceeded { retry_after }) => {
335 self.handle_quota_error("chat", attempt, max_retries, retry_after)?;
336 }
337 Err(ref e) if e.is_quota_error() => {
338 self.handle_quota_error(
339 "chat",
340 attempt,
341 max_retries,
342 DEFAULT_QUOTA_BACKOFF,
343 )?;
344 }
345 Err(e) => return Err(e),
346 }
347 }
348 return Err(Error::QuotaExceeded {
349 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
350 });
351 };
352
353 match self.last_shared_state.lock() {
354 Ok(mut guard) => {
355 *guard = Some(Arc::clone(&handle.shared_state));
356 }
357 Err(e) => {
358 tracing::error!(
359 agent_id = self.id,
360 error = %e,
361 "last_shared_state mutex poisoned — streaming metadata may be stale"
362 );
363 }
364 }
365 Ok(handle)
366 }
367
368 /// Send a message and return the final text response.
369 ///
370 /// This is a convenience wrapper around [`chat`](Self::chat) that drains
371 /// the streaming response into a single `String`. If tools were associated
372 /// with the agent at creation time, the Python runtime handles tool
373 /// execution automatically.
374 ///
375 /// # Errors
376 ///
377 /// Returns [`Error`] if the chat turn fails or stream errors occur.
378 pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
379 let response = self.chat(message.into()).await?;
380 let text = response.text().await.map_err(|e| {
381 let converted = Error::from(e);
382 if matches!(converted, Error::Safety) {
383 converted
384 } else {
385 Error::BackendError {
386 message: format!("Failed to read response text: {converted}"),
387 }
388 }
389 })?;
390 Ok(text.into_string())
391 }
392
393 /// Return the current conversation ID, if one has been set.
394 ///
395 /// Returns a cloned `String` because the underlying value is behind a
396 /// [`Mutex`] (interior mutability for `&self` access).
397 #[must_use]
398 pub fn conversation_id(&self) -> Option<String> {
399 self.conversation_id
400 .lock()
401 .inspect_err(|e| {
402 tracing::error!(
403 agent_id = self.id,
404 error = %e,
405 "conversation_id mutex poisoned"
406 );
407 })
408 .ok()
409 .and_then(|guard| guard.clone())
410 }
411
412 /// Set the conversation ID (called when the SDK assigns one).
413 ///
414 /// Takes `&self` rather than `&mut self` so the handle can be shared
415 /// across concurrent tasks.
416 pub fn set_conversation_id(&self, id: String) {
417 match self.conversation_id.lock() {
418 Ok(mut guard) => {
419 *guard = Some(id);
420 }
421 Err(e) => {
422 tracing::error!(
423 agent_id = self.id,
424 error = %e,
425 "conversation_id mutex poisoned — ID will not be updated"
426 );
427 }
428 }
429 }
430
431 /// Check whether the agent has been started and is not yet shut down.
432 #[must_use]
433 pub fn is_started(&self) -> bool {
434 self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
435 }
436
437 /// Return the agent's unique identifier.
438 #[must_use]
439 pub const fn id(&self) -> AgentId {
440 self.id
441 }
442
443 /// Return a reference to the agent's configuration.
444 #[must_use]
445 pub const fn config(&self) -> &AgentConfig {
446 &self.config
447 }
448
449 /// Interrupt the active chat prompt execution.
450 ///
451 /// # Errors
452 ///
453 /// Returns a [`Error`] if the cancellation call fails.
454 pub async fn cancel(&self) -> Result<(), Error> {
455 self.runtime.cancel(self.id).await
456 }
457
458 /// Wait for the conversation or active run to stabilize and become idle.
459 ///
460 /// # Errors
461 ///
462 /// Returns a [`Error`] if the wait call fails.
463 pub async fn wait_for_idle(&self) -> Result<(), Error> {
464 self.runtime.wait_for_idle(self.id).await
465 }
466
467 /// Retrieve the conversation's message history.
468 ///
469 /// # Errors
470 ///
471 /// Returns [`Error`] if the query fails.
472 pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
473 self.runtime.history(self.id).await
474 }
475
476 /// Return the number of completed turns in the conversation.
477 ///
478 /// # Errors
479 ///
480 /// Returns [`Error`] if the query fails.
481 pub async fn turn_count(&self) -> Result<u32, Error> {
482 self.runtime.turn_count(self.id).await
483 }
484
485 /// Return cumulative token usage across all turns.
486 ///
487 /// # Errors
488 ///
489 /// Returns [`Error`] if the query fails.
490 pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
491 self.runtime.total_usage(self.id).await
492 }
493
494 /// Return token usage from the most recent turn only.
495 ///
496 /// # Errors
497 ///
498 /// Returns [`Error`] if the query fails.
499 pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
500 self.runtime.last_turn_usage(self.id).await
501 }
502
503 /// Clear the conversation history and reset state.
504 ///
505 /// # Errors
506 ///
507 /// Returns [`Error`] if the operation fails.
508 pub async fn clear_history(&self) -> Result<(), Error> {
509 self.runtime.clear_history(self.id).await
510 }
511
512 /// Return the text of the last model response, if any.
513 ///
514 /// # Errors
515 ///
516 /// Returns [`Error`] if the query fails.
517 pub async fn last_response(&self) -> Result<Option<String>, Error> {
518 self.runtime.last_response(self.id).await
519 }
520
521 /// Return the step indices at which conversation compaction occurred.
522 ///
523 /// # Errors
524 ///
525 /// Returns [`Error`] if the query fails.
526 pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
527 self.runtime.compaction_indices(self.id).await
528 }
529
530 /// Delete the conversation and all associated state.
531 ///
532 /// After calling this method, the agent handle is no longer usable
533 /// for chat operations. This also marks the agent as shut down.
534 ///
535 /// # Errors
536 ///
537 /// Returns [`Error`] if the delete operation fails.
538 pub async fn delete(&self) -> Result<(), Error> {
539 let result = self.runtime.delete(self.id).await;
540 self.is_shutdown.store(true, Ordering::SeqCst);
541 result
542 }
543
544 /// Disconnect from the agent without deleting its state.
545 ///
546 /// The agent's conversation state is preserved but this handle
547 /// can no longer send messages. Marks the agent as shut down.
548 ///
549 /// # Errors
550 ///
551 /// Returns [`Error`] if the disconnect operation fails.
552 pub async fn disconnect(&self) -> Result<(), Error> {
553 let result = self.runtime.disconnect(self.id).await;
554 self.is_shutdown.store(true, Ordering::SeqCst);
555 result
556 }
557
558 /// Check whether the agent is currently idle (not running a turn).
559 ///
560 /// # Errors
561 ///
562 /// Returns [`Error`] if the query fails.
563 pub async fn is_idle(&self) -> Result<bool, Error> {
564 self.runtime.is_idle(self.id).await
565 }
566
567 /// Return the structured output from the last chat response, if any.
568 ///
569 /// Only populated after a [`chat()`](Self::chat) round-trip when the
570 /// agent was configured with a `response_schema` and the model returned
571 /// a valid JSON payload.
572 #[must_use]
573 pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
574 let guard = self
575 .last_shared_state
576 .lock()
577 .inspect_err(|e| {
578 tracing::error!(
579 agent_id = self.id,
580 error = %e,
581 "last_shared_state mutex poisoned in get_last_structured_output"
582 );
583 })
584 .ok()?;
585 let state = guard
586 .as_ref()?
587 .lock()
588 .inspect_err(|e| {
589 tracing::error!(
590 agent_id = self.id,
591 error = %e,
592 "ChatResponseSharedState mutex poisoned in get_last_structured_output"
593 );
594 })
595 .ok()?;
596 state.structured_output.clone()
597 }
598
599 /// Return the structured output from the last chat response deserialized into `T`.
600 ///
601 /// Returns `None` if there was no structured output on the last response.
602 /// Returns `Some(Err(...))` if the structured output could not be deserialized as `T`.
603 pub fn get_last_structured_output_as<T: serde::de::DeserializeOwned>(
604 &self,
605 ) -> Option<Result<T, serde_json::Error>> {
606 self.get_last_structured_output()
607 .map(serde_json::from_value)
608 }
609
610 /// Return the usage metadata from the last chat response, if any.
611 #[must_use]
612 pub fn get_last_usage(&self) -> Option<UsageMetadata> {
613 let guard = self
614 .last_shared_state
615 .lock()
616 .inspect_err(|e| {
617 tracing::error!(
618 agent_id = self.id,
619 error = %e,
620 "last_shared_state mutex poisoned in get_last_usage"
621 );
622 })
623 .ok()?;
624 let state = guard
625 .as_ref()?
626 .lock()
627 .inspect_err(|e| {
628 tracing::error!(
629 agent_id = self.id,
630 error = %e,
631 "ChatResponseSharedState mutex poisoned in get_last_usage"
632 );
633 })
634 .ok()?;
635 state.usage.clone()
636 }
637
638 /// Send a message without waiting for a response.
639 ///
640 /// Fire-and-forget: the message is delivered to the agent but no
641 /// streaming response is produced.
642 ///
643 /// # Errors
644 ///
645 /// Returns a [`Error`] if sending fails.
646 pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
647 if !self.is_started() {
648 return Err(Error::AgentNotStarted);
649 }
650
651 let content = content.into();
652
653 let max_retries = self.config.max_quota_retries.unwrap_or(0);
654
655 for attempt in 0..=max_retries {
656 if attempt > 0 {
657 self.quota_state.wait_for_quota().await;
658 }
659 match self.runtime.send(self.id, &content).await {
660 Ok(()) => return Ok(()),
661 Err(Error::QuotaExceeded { retry_after }) => {
662 self.handle_quota_error("send", attempt, max_retries, retry_after)?;
663 }
664 Err(ref e) if e.is_quota_error() => {
665 self.handle_quota_error("send", attempt, max_retries, DEFAULT_QUOTA_BACKOFF)?;
666 }
667 Err(e) => return Err(e),
668 }
669 }
670 Err(Error::QuotaExceeded {
671 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
672 })
673 }
674
675 /// Signal that this agent is idle and ready to receive input.
676 ///
677 /// # Errors
678 ///
679 /// Returns a [`Error`] if the signal call fails.
680 pub async fn signal_idle(&self) -> Result<(), Error> {
681 self.runtime.signal_idle(self.id).await
682 }
683
684 /// Wait for the agent to wake up, returning `true` if woken or
685 /// `false` if the `timeout` elapsed.
686 ///
687 /// # Errors
688 ///
689 /// Returns a [`Error`] if the wait call fails.
690 pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
691 self.runtime.wait_for_wakeup(self.id, timeout).await
692 }
693
694 /// Handle a quota/429 error from a retryable operation.
695 fn handle_quota_error(
696 &self,
697 operation: &str,
698 attempt: u32,
699 max_retries: u32,
700 retry_after: std::time::Duration,
701 ) -> Result<(), Error> {
702 if attempt >= max_retries {
703 return Err(Error::QuotaExceeded { retry_after });
704 }
705 tracing::warn!(
706 agent_id = self.id,
707 attempt = attempt + 1,
708 max = max_retries,
709 retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
710 tracing::warn!("Int conversion failed: {e}");
711 u64::MAX
712 }),
713 "Quota exceeded on {operation} — recording hit and retrying"
714 );
715 self.quota_state.record_quota_hit(retry_after);
716 Ok(())
717 }
718
719 /// Gracefully shut down the agent.
720 ///
721 /// This sends a `ShutdownAgent` command to the Python runtime, which
722 /// calls `__aexit__()` on the SDK agent. The handle remains usable
723 /// for read-only queries (e.g. [`is_started()`](Self::is_started))
724 /// after shutdown.
725 ///
726 /// # Errors
727 ///
728 /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
729 /// flag is always set so the `Drop` impl will not emit a warning.
730 pub async fn shutdown(&self) -> Result<(), Error> {
731 if self.is_shutdown.load(Ordering::SeqCst) {
732 tracing::debug!(agent_id = self.id, "Agent already shut down");
733 return Ok(());
734 }
735
736 tracing::info!(agent_id = self.id, "Shutting down agent");
737 let result = self.runtime.shutdown_agent(self.id).await;
738
739 // Always mark as shut down so Drop doesn't warn, even on failure.
740 self.is_shutdown.store(true, Ordering::SeqCst);
741
742 // Clean up bridge state AFTER the runtime's shutdown completes.
743 // In the live runtime, `__aexit__` fires hooks (e.g. on_session_end)
744 // that look up bridge state — so this must happen after, not before.
745 match crate::runtime::bridge_state().write() {
746 Ok(mut map) => {
747 map.remove(&self.id);
748 }
749 Err(e) => {
750 tracing::error!(
751 agent_id = self.id,
752 error = %e,
753 "BRIDGE_STATE RwLock poisoned during shutdown cleanup — \
754 bridge state entry may leak"
755 );
756 }
757 }
758
759 match result {
760 Ok(()) => {
761 tracing::info!(agent_id = self.id, "Agent shut down successfully");
762 }
763 Err(ref e) => {
764 tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
765 }
766 }
767
768 result
769 }
770
771 /// Spawn a subagent from the given config, sharing this agent's runtime.
772 ///
773 /// If a `ToolRegistry` is provided and `config.tools` is empty, the
774 /// registry's definitions are automatically applied.
775 ///
776 /// # Errors
777 ///
778 /// Returns a [`Error`] if agent creation fails.
779 pub async fn spawn_subagent(
780 &self,
781 mut config: AgentConfig,
782 registry: impl Into<Option<crate::tools::ToolRegistry>>,
783 ) -> Result<Self, Error> {
784 let opt_registry = registry.into();
785 if let Some(disp) = &opt_registry
786 && config.tools.is_empty()
787 {
788 config.tools = disp.definitions();
789 }
790 let arc_registry = opt_registry.map(Arc::new);
791 Self::new(
792 Arc::clone(&self.runtime),
793 config,
794 arc_registry,
795 None,
796 self.policy_handler.clone(),
797 )
798 .await
799 }
800}
801
802impl<R: Runtime> Drop for AgentHandle<R> {
803 fn drop(&mut self) {
804 if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
805 tracing::debug!(
806 agent_id = self.id,
807 "AgentHandle dropped without explicit shutdown() — \
808 sending best-effort shutdown signal"
809 );
810 // try_shutdown_agent fires a command that eventually calls
811 // handle_shutdown_agent, which cleans up bridge state AFTER
812 // __aexit__ completes (so on_session_end hooks can still
813 // find the hook runner). Do NOT clean up bridge state here.
814 self.runtime.try_shutdown_agent(self.id);
815 } else if self.is_shutdown.load(Ordering::SeqCst) {
816 // shutdown() was already called — handle_shutdown_agent
817 // already cleaned up bridge state after __aexit__. Nothing
818 // to do.
819 } else {
820 // Agent was never started (e.g. creation failed). Clean up
821 // any partial bridge state that might have been registered.
822 if let Ok(mut map) = crate::runtime::bridge_state().write() {
823 map.remove(&self.id);
824 } else {
825 tracing::error!(
826 agent_id = self.id,
827 "BRIDGE_STATE RwLock poisoned during Drop — \
828 bridge state entry for this agent may leak"
829 );
830 }
831 }
832 }
833}