agy_bridge/agent/mod.rs
1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7 Arc, Mutex,
8 atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12 config::AgentConfig,
13 content::Content,
14 error::Error,
15 streaming::{ChatResponseHandle, ChatResponseSharedState},
16 types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37 async_fn_in_trait,
38 reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41 /// Create an agent from the given config, returning its ID.
42 async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44 /// Send a chat message to the agent, returning a streaming response handle.
45 ///
46 /// The `content` parameter accepts any [`Content`] variant: plain text,
47 /// images, documents, audio, video, or a multi-part list.
48 async fn chat(&self, agent_id: AgentId, content: &Content)
49 -> Result<ChatResponseHandle, Error>;
50
51 /// Gracefully shut down the agent.
52 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54 /// Interrupt any active prompt/chat run.
55 async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57 /// Wait for the active run or conversational loop to stabilize.
58 async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60 /// Send a message without waiting for completion.
61 async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63 /// Signal that the agent is idle.
64 async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66 /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67 async fn wait_for_wakeup(
68 &self,
69 agent_id: AgentId,
70 timeout: std::time::Duration,
71 ) -> Result<bool, Error>;
72
73 /// Acquire a keep-alive permit during quota waits.
74 async fn acquire_quota_keep_alive_permit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
75 None
76 }
77
78 /// Wait if we're in a quota backoff period.
79 async fn wait_for_quota(&self);
80
81 /// Record a quota hit with the suggested retry duration.
82 async fn record_quota_hit(&self, retry_after: std::time::Duration);
83
84 /// Access this runtime's per-key quota registry.
85 ///
86 /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
87 /// so different runtimes have fully independent quota tracking.
88 fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
89
90 /// Retrieve the conversation's message history.
91 async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
92
93 /// Return the number of completed turns in the conversation.
94 async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
95
96 /// Return cumulative token usage across all turns.
97 async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
98
99 /// Return token usage from the most recent turn only.
100 async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
101
102 /// Clear the conversation history and reset state.
103 async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
104
105 /// Return the text of the last model response, if any.
106 ///
107 /// Default implementation returns `Ok(None)`.
108 async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
109 Ok(None)
110 }
111
112 /// Return the step indices at which compaction occurred.
113 ///
114 /// Default implementation returns an empty list.
115 async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
116 Ok(Vec::new())
117 }
118
119 /// Delete the conversation and all associated state.
120 ///
121 /// Default implementation is a no-op that returns `Ok(())`.
122 async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
123 Ok(())
124 }
125
126 /// Disconnect from the agent without deleting state.
127 ///
128 /// Default implementation is a no-op that returns `Ok(())`.
129 async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
130 Ok(())
131 }
132
133 /// Check whether the agent is currently idle (not running a turn).
134 ///
135 /// Default implementation returns `Ok(true)`.
136 async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
137 Ok(true)
138 }
139
140 /// Best-effort synchronous shutdown signal, called from [`Drop`].
141 ///
142 /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
143 /// fire-and-forget — it cannot return errors. The default is a no-op;
144 /// implementations backed by a command channel should `try_send` a
145 /// shutdown command here.
146 fn try_shutdown_agent(&self, _agent_id: AgentId) {}
147}
148
149/// Handle to a running agent.
150///
151/// Wraps the agent's lifecycle: creation, chat, and shutdown.
152///
153/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
154/// If the handle is dropped without calling `shutdown()`, a best-effort
155/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
156/// will be cleaned up, but errors are only logged, not returned.
157///
158/// Most methods take `&self` — interior mutability is used where needed
159/// so multiple concurrent operations can share a single handle.
160///
161/// # Mutex choice
162///
163/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
164/// because every lock acquisition is a brief, synchronous operation (pointer
165/// swap or clone) that **never** spans an `.await` point. For these
166/// microsecond critical sections, `std::sync::Mutex` is both simpler and
167/// lower-overhead than the async alternative.
168pub struct AgentHandle<R: Runtime + 'static> {
169 id: AgentId,
170 runtime: Arc<R>,
171 config: AgentConfig,
172 /// Per-API-key quota state. Agents sharing the same effective API key
173 /// share backoff tracking; agents with different keys are independent.
174 quota_state: Arc<crate::quota::QuotaState>,
175 /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
176 /// entry isn't the only strong reference.
177 _registry: Option<Arc<crate::tools::ToolRegistry>>,
178 /// Kept alive to preserve a strong reference to the policy confirmation handler.
179 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
180 conversation_id: Mutex<Option<String>>,
181 is_started: AtomicBool,
182 is_shutdown: AtomicBool,
183 /// Shared state from the last completed chat response, used to surface
184 /// `get_last_structured_output()` without round-tripping to Python.
185 ///
186 /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
187 /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
188 last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
189}
190
191impl<R: Runtime> AgentHandle<R> {
192 /// Create a new agent from the given runtime and configuration.
193 ///
194 /// This sends a `CreateAgent` command to the Python runtime, waits for
195 /// quota availability, and returns the handle.
196 ///
197 /// # Errors
198 ///
199 /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
200 /// Python error, or quota exceeded).
201 pub async fn new(
202 runtime: Arc<R>,
203 config: AgentConfig,
204 registry: Option<Arc<crate::tools::ToolRegistry>>,
205 hook_runner: Option<Arc<crate::hooks::Hooks>>,
206 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
207 ) -> Result<Self, Error> {
208 let quota_key = config.effective_api_key().unwrap_or_default();
209 let quota_state = runtime.quota_registry().state_for_key("a_key);
210 quota_state.wait_for_quota().await;
211 let id = if hook_runner.is_some() {
212 // Serialize the set→create→clear sequence so concurrent creates
213 // cannot overwrite each other's temporary hook runner.
214 //
215 // NOTE: These must remain process-global because the Python-side
216 // callback (`dispatch_rust_hook`) is itself process-global — it is
217 // registered in `sys.modules["_agy_bridge_globals"]` and has no way
218 // to identify which runtime instance triggered it. A per-runtime
219 // guard would not prevent cross-runtime races on the shared
220 // INITIALIZING_HOOK_RUNNER slot.
221 let _guard = crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await;
222 if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
223 *opt = hook_runner.as_ref().map(Arc::clone);
224 } else {
225 tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — hook may not fire");
226 }
227 let result = runtime.create_agent(config.clone()).await;
228 if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
229 *opt = None;
230 } else {
231 tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — stale hook may persist");
232 }
233 result?
234 } else {
235 runtime.create_agent(config.clone()).await?
236 };
237
238 tracing::info!(agent_id = id, "Agent created successfully");
239
240 // Build and insert per-agent bridge state in a single lock acquisition.
241 let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
242 let bridge_entry = crate::runtime::AgentBridgeState {
243 registry: registry.as_ref().map(Arc::clone),
244 hook_runner: hook_runner.as_ref().map(Arc::clone),
245 policies: policies_set,
246 policy_handler: policy_handler.as_ref().map(Arc::clone),
247 tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
248 };
249 if let Ok(mut map) = crate::runtime::bridge_state().write() {
250 map.insert(id, bridge_entry);
251 } else {
252 tracing::error!(
253 agent_id = id,
254 "Failed to acquire write lock on BRIDGE_STATE"
255 );
256 }
257
258 let conversation_id = Mutex::new(config.conversation_id.clone());
259 Ok(Self {
260 id,
261 runtime,
262 config,
263 quota_state,
264 _registry: registry,
265 policy_handler,
266 conversation_id,
267 is_started: AtomicBool::new(true),
268 is_shutdown: AtomicBool::new(false),
269 last_shared_state: Mutex::new(None),
270 })
271 }
272
273 /// Send a message and receive a streaming response.
274 ///
275 /// Accepts any type that converts into [`Content`]: `&str`, `String`,
276 /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
277 /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
278 /// `Vec<ContentPrimitive>` for multimodal input.
279 ///
280 /// Automatically backs off on quota limits (HTTP 429).
281 ///
282 /// # Errors
283 ///
284 /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
285 pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
286 if !self.is_started() {
287 return Err(Error::AgentNotStarted);
288 }
289
290 let content = content.into();
291
292 let permit = self.runtime.acquire_quota_keep_alive_permit().await;
293
294 let mut handle = 'retry: {
295 for attempt in 0..=Self::MAX_QUOTA_RETRIES {
296 self.quota_state.wait_for_quota().await;
297 match self.runtime.chat(self.id, &content).await {
298 Ok(h) => break 'retry h,
299 Err(Error::QuotaExceeded { retry_after }) => {
300 self.handle_quota_error("chat", attempt, retry_after)?;
301 }
302 Err(ref e) if e.is_quota_error() => {
303 self.handle_quota_error("chat", attempt, DEFAULT_QUOTA_BACKOFF)?;
304 }
305 Err(e) => return Err(e),
306 }
307 }
308 return Err(Error::QuotaExceeded {
309 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
310 });
311 };
312
313 handle.keep_alive_permit = permit;
314 if let Ok(mut guard) = self.last_shared_state.lock() {
315 *guard = Some(Arc::clone(&handle.shared_state));
316 } else {
317 tracing::error!("last_shared_state mutex poisoned — streaming metadata may be stale");
318 }
319 Ok(handle)
320 }
321
322 /// Send a message and return the final text response.
323 ///
324 /// This is a convenience wrapper around [`chat`](Self::chat) that drains
325 /// the streaming response into a single `String`. If tools were associated
326 /// with the agent at creation time, the Python runtime handles tool
327 /// execution automatically.
328 ///
329 /// # Errors
330 ///
331 /// Returns [`Error`] if the chat turn fails or stream errors occur.
332 pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
333 let response = self.chat(message.into()).await?;
334 self.drain_text_with_retry(response).await
335 }
336
337 /// Drain text from a [`ChatResponseHandle`], retrying on 429 quota errors.
338 async fn drain_text_with_retry(&self, response: ChatResponseHandle) -> Result<String, Error> {
339 match response.text().await {
340 Ok(result) => Ok(result.into_string()),
341 Err(e)
342 if e.message.contains("429")
343 || e.message.contains("503")
344 || e.message.contains("RESOURCE_EXHAUSTED") =>
345 {
346 Err(Error::QuotaExceeded {
347 retry_after: DEFAULT_QUOTA_BACKOFF,
348 })
349 }
350 Err(e) => {
351 let converted = Error::from(e);
352 if matches!(converted, Error::Safety) {
353 Err(converted)
354 } else {
355 Err(Error::BackendError {
356 message: format!("Failed to read response text: {converted}"),
357 })
358 }
359 }
360 }
361 }
362
363 /// Return the current conversation ID, if one has been set.
364 ///
365 /// Returns a cloned `String` because the underlying value is behind a
366 /// [`Mutex`] (interior mutability for `&self` access).
367 #[must_use]
368 pub fn conversation_id(&self) -> Option<String> {
369 self.conversation_id
370 .lock()
371 .inspect_err(|e| {
372 tracing::error!(
373 agent_id = self.id,
374 error = %e,
375 "conversation_id mutex poisoned"
376 );
377 })
378 .ok()
379 .and_then(|guard| guard.clone())
380 }
381
382 /// Set the conversation ID (called when the SDK assigns one).
383 ///
384 /// Takes `&self` rather than `&mut self` so the handle can be shared
385 /// across concurrent tasks.
386 pub fn set_conversation_id(&self, id: String) {
387 if let Ok(mut guard) = self.conversation_id.lock() {
388 *guard = Some(id);
389 } else {
390 tracing::error!("Failed to acquire lock on conversation_id");
391 }
392 }
393
394 /// Check whether the agent has been started and is not yet shut down.
395 #[must_use]
396 pub fn is_started(&self) -> bool {
397 self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
398 }
399
400 /// Return the agent's unique identifier.
401 #[must_use]
402 pub const fn id(&self) -> AgentId {
403 self.id
404 }
405
406 /// Return a reference to the agent's configuration.
407 #[must_use]
408 pub const fn config(&self) -> &AgentConfig {
409 &self.config
410 }
411
412 /// Interrupt the active chat prompt execution.
413 ///
414 /// # Errors
415 ///
416 /// Returns a [`Error`] if the cancellation call fails.
417 pub async fn cancel(&self) -> Result<(), Error> {
418 self.runtime.cancel(self.id).await
419 }
420
421 /// Wait for the conversation or active run to stabilize and become idle.
422 ///
423 /// # Errors
424 ///
425 /// Returns a [`Error`] if the wait call fails.
426 pub async fn wait_for_idle(&self) -> Result<(), Error> {
427 self.runtime.wait_for_idle(self.id).await
428 }
429
430 /// Retrieve the conversation's message history.
431 ///
432 /// # Errors
433 ///
434 /// Returns [`Error`] if the query fails.
435 pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
436 self.runtime.history(self.id).await
437 }
438
439 /// Return the number of completed turns in the conversation.
440 ///
441 /// # Errors
442 ///
443 /// Returns [`Error`] if the query fails.
444 pub async fn turn_count(&self) -> Result<u32, Error> {
445 self.runtime.turn_count(self.id).await
446 }
447
448 /// Return cumulative token usage across all turns.
449 ///
450 /// # Errors
451 ///
452 /// Returns [`Error`] if the query fails.
453 pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
454 self.runtime.total_usage(self.id).await
455 }
456
457 /// Return token usage from the most recent turn only.
458 ///
459 /// # Errors
460 ///
461 /// Returns [`Error`] if the query fails.
462 pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
463 self.runtime.last_turn_usage(self.id).await
464 }
465
466 /// Clear the conversation history and reset state.
467 ///
468 /// # Errors
469 ///
470 /// Returns [`Error`] if the operation fails.
471 pub async fn clear_history(&self) -> Result<(), Error> {
472 self.runtime.clear_history(self.id).await
473 }
474
475 /// Return the text of the last model response, if any.
476 ///
477 /// # Errors
478 ///
479 /// Returns [`Error`] if the query fails.
480 pub async fn last_response(&self) -> Result<Option<String>, Error> {
481 self.runtime.last_response(self.id).await
482 }
483
484 /// Return the step indices at which conversation compaction occurred.
485 ///
486 /// # Errors
487 ///
488 /// Returns [`Error`] if the query fails.
489 pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
490 self.runtime.compaction_indices(self.id).await
491 }
492
493 /// Delete the conversation and all associated state.
494 ///
495 /// After calling this method, the agent handle is no longer usable
496 /// for chat operations. This also marks the agent as shut down.
497 ///
498 /// # Errors
499 ///
500 /// Returns [`Error`] if the delete operation fails.
501 pub async fn delete(&self) -> Result<(), Error> {
502 let result = self.runtime.delete(self.id).await;
503 self.is_shutdown.store(true, Ordering::SeqCst);
504 result
505 }
506
507 /// Disconnect from the agent without deleting its state.
508 ///
509 /// The agent's conversation state is preserved but this handle
510 /// can no longer send messages. Marks the agent as shut down.
511 ///
512 /// # Errors
513 ///
514 /// Returns [`Error`] if the disconnect operation fails.
515 pub async fn disconnect(&self) -> Result<(), Error> {
516 let result = self.runtime.disconnect(self.id).await;
517 self.is_shutdown.store(true, Ordering::SeqCst);
518 result
519 }
520
521 /// Check whether the agent is currently idle (not running a turn).
522 ///
523 /// # Errors
524 ///
525 /// Returns [`Error`] if the query fails.
526 pub async fn is_idle(&self) -> Result<bool, Error> {
527 self.runtime.is_idle(self.id).await
528 }
529
530 /// Return the structured output from the last chat response, if any.
531 ///
532 /// Only populated after a [`chat()`](Self::chat) round-trip when the
533 /// agent was configured with a `response_schema` and the model returned
534 /// a valid JSON payload.
535 #[must_use]
536 pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
537 let guard = self
538 .last_shared_state
539 .lock()
540 .inspect_err(|e| {
541 tracing::error!(
542 agent_id = self.id,
543 error = %e,
544 "last_shared_state mutex poisoned in get_last_structured_output"
545 );
546 })
547 .ok()?;
548 let state = guard
549 .as_ref()?
550 .lock()
551 .inspect_err(|e| {
552 tracing::error!(
553 agent_id = self.id,
554 error = %e,
555 "ChatResponseSharedState mutex poisoned in get_last_structured_output"
556 );
557 })
558 .ok()?;
559 state.structured_output.clone()
560 }
561
562 /// Return the usage metadata from the last chat response, if any.
563 #[must_use]
564 pub fn get_last_usage(&self) -> Option<UsageMetadata> {
565 let guard = self
566 .last_shared_state
567 .lock()
568 .inspect_err(|e| {
569 tracing::error!(
570 agent_id = self.id,
571 error = %e,
572 "last_shared_state mutex poisoned in get_last_usage"
573 );
574 })
575 .ok()?;
576 let state = guard
577 .as_ref()?
578 .lock()
579 .inspect_err(|e| {
580 tracing::error!(
581 agent_id = self.id,
582 error = %e,
583 "ChatResponseSharedState mutex poisoned in get_last_usage"
584 );
585 })
586 .ok()?;
587 state.usage.clone()
588 }
589
590 /// Send a message without waiting for a response.
591 ///
592 /// Fire-and-forget: the message is delivered to the agent but no
593 /// streaming response is produced.
594 ///
595 /// # Errors
596 ///
597 /// Returns a [`Error`] if sending fails.
598 pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
599 if !self.is_started() {
600 return Err(Error::AgentNotStarted);
601 }
602
603 let content = content.into();
604
605 for attempt in 0..=Self::MAX_QUOTA_RETRIES {
606 self.quota_state.wait_for_quota().await;
607 match self.runtime.send(self.id, &content).await {
608 Ok(()) => return Ok(()),
609 Err(Error::QuotaExceeded { retry_after }) => {
610 self.handle_quota_error("send", attempt, retry_after)?;
611 }
612 Err(ref e) if e.is_quota_error() => {
613 self.handle_quota_error("send", attempt, DEFAULT_QUOTA_BACKOFF)?;
614 }
615 Err(e) => return Err(e),
616 }
617 }
618 Err(Error::QuotaExceeded {
619 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
620 })
621 }
622
623 /// Signal that this agent is idle and ready to receive input.
624 ///
625 /// # Errors
626 ///
627 /// Returns a [`Error`] if the signal call fails.
628 pub async fn signal_idle(&self) -> Result<(), Error> {
629 self.runtime.signal_idle(self.id).await
630 }
631
632 /// Wait for the agent to wake up, returning `true` if woken or
633 /// `false` if the `timeout` elapsed.
634 ///
635 /// # Errors
636 ///
637 /// Returns a [`Error`] if the wait call fails.
638 pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
639 self.runtime.wait_for_wakeup(self.id, timeout).await
640 }
641
642 /// Maximum number of quota retry attempts before giving up.
643 const MAX_QUOTA_RETRIES: u32 = 120;
644
645 /// Handle a quota/429 error from a retryable operation.
646 ///
647 /// Returns `Ok(())` if the caller should retry, or `Err` if retries
648 /// are exhausted.
649 fn handle_quota_error(
650 &self,
651 operation: &str,
652 attempt: u32,
653 retry_after: std::time::Duration,
654 ) -> Result<(), Error> {
655 if attempt == Self::MAX_QUOTA_RETRIES {
656 return Err(Error::QuotaExceeded { retry_after });
657 }
658 tracing::warn!(
659 agent_id = self.id,
660 attempt = attempt + 1,
661 max = Self::MAX_QUOTA_RETRIES,
662 retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
663 tracing::warn!("Int conversion failed: {e}");
664 u64::MAX
665 }),
666 "Quota exceeded on {operation} — recording hit and retrying"
667 );
668 self.quota_state.record_quota_hit(retry_after);
669 Ok(())
670 }
671
672 /// Gracefully shut down the agent.
673 ///
674 /// This sends a `ShutdownAgent` command to the Python runtime, which
675 /// calls `__aexit__()` on the SDK agent. The handle remains usable
676 /// for read-only queries (e.g. [`is_started()`](Self::is_started))
677 /// after shutdown.
678 ///
679 /// # Errors
680 ///
681 /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
682 /// flag is always set so the `Drop` impl will not emit a warning.
683 pub async fn shutdown(&self) -> Result<(), Error> {
684 if self.is_shutdown.load(Ordering::SeqCst) {
685 tracing::debug!(agent_id = self.id, "Agent already shut down");
686 return Ok(());
687 }
688
689 tracing::info!(agent_id = self.id, "Shutting down agent");
690 let result = self.runtime.shutdown_agent(self.id).await;
691
692 // Always mark as shut down so Drop doesn't warn, even on failure.
693 self.is_shutdown.store(true, Ordering::SeqCst);
694
695 match result {
696 Ok(()) => {
697 tracing::info!(agent_id = self.id, "Agent shut down successfully");
698 }
699 Err(ref e) => {
700 tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
701 }
702 }
703
704 result
705 }
706
707 /// Spawn a subagent from the given config, sharing this agent's runtime.
708 ///
709 /// If a `ToolRegistry` is provided and `config.tools` is empty, the
710 /// registry's definitions are automatically applied.
711 ///
712 /// # Errors
713 ///
714 /// Returns a [`Error`] if agent creation fails.
715 pub async fn spawn_subagent(
716 &self,
717 mut config: AgentConfig,
718 registry: impl Into<Option<crate::tools::ToolRegistry>>,
719 ) -> Result<Self, Error> {
720 let opt_registry = registry.into();
721 if let Some(disp) = &opt_registry
722 && config.tools.is_empty()
723 {
724 config.tools = disp.definitions();
725 }
726 let arc_registry = opt_registry.map(Arc::new);
727 Self::new(
728 Arc::clone(&self.runtime),
729 config,
730 arc_registry,
731 None,
732 self.policy_handler.clone(),
733 )
734 .await
735 }
736}
737
738impl<R: Runtime> Drop for AgentHandle<R> {
739 fn drop(&mut self) {
740 if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
741 tracing::debug!(
742 agent_id = self.id,
743 "AgentHandle dropped without explicit shutdown() — \
744 sending best-effort shutdown signal"
745 );
746 self.runtime.try_shutdown_agent(self.id);
747 }
748
749 // Clean up global bridge state entry.
750 if let Ok(mut map) = crate::runtime::bridge_state().write() {
751 map.remove(&self.id);
752 } else {
753 tracing::error!(
754 agent_id = self.id,
755 "BRIDGE_STATE RwLock poisoned during Drop — \
756 bridge state entry for this agent may leak"
757 );
758 }
759 }
760}