agy_bridge/agent/mod.rs
1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7 Arc, Mutex,
8 atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12 config::AgentConfig,
13 content::Content,
14 error::Error,
15 streaming::{ChatResponseHandle, ChatResponseSharedState},
16 types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37 async_fn_in_trait,
38 reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41 /// Create an agent from the given config, returning its ID.
42 async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44 /// Send a chat message to the agent, returning a streaming response handle.
45 ///
46 /// The `content` parameter accepts any [`Content`] variant: plain text,
47 /// images, documents, audio, video, or a multi-part list.
48 async fn chat(&self, agent_id: AgentId, content: &Content)
49 -> Result<ChatResponseHandle, Error>;
50
51 /// Gracefully shut down the agent.
52 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54 /// Interrupt any active prompt/chat run.
55 async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57 /// Wait for the active run or conversational loop to stabilize.
58 async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60 /// Send a message without waiting for completion.
61 async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63 /// Signal that the agent is idle.
64 async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66 /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67 async fn wait_for_wakeup(
68 &self,
69 agent_id: AgentId,
70 timeout: std::time::Duration,
71 ) -> Result<bool, Error>;
72
73 /// Wait if we're in a quota backoff period.
74 async fn wait_for_quota(&self);
75
76 /// Record a quota hit with the suggested retry duration.
77 async fn record_quota_hit(&self, retry_after: std::time::Duration);
78
79 /// Access this runtime's per-key quota registry.
80 ///
81 /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
82 /// so different runtimes have fully independent quota tracking.
83 fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
84
85 /// Retrieve the conversation's message history.
86 async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
87
88 /// Return the number of completed turns in the conversation.
89 async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
90
91 /// Return cumulative token usage across all turns.
92 async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
93
94 /// Return token usage from the most recent turn only.
95 async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
96
97 /// Clear the conversation history and reset state.
98 async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
99
100 /// Return the text of the last model response, if any.
101 ///
102 /// Default implementation returns `Ok(None)`.
103 async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
104 Ok(None)
105 }
106
107 /// Return the step indices at which compaction occurred.
108 ///
109 /// Default implementation returns an empty list.
110 async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
111 Ok(Vec::new())
112 }
113
114 /// Delete the conversation and all associated state.
115 ///
116 /// Default implementation is a no-op that returns `Ok(())`.
117 async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
118 Ok(())
119 }
120
121 /// Disconnect from the agent without deleting state.
122 ///
123 /// Default implementation is a no-op that returns `Ok(())`.
124 async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
125 Ok(())
126 }
127
128 /// Check whether the agent is currently idle (not running a turn).
129 ///
130 /// Default implementation returns `Ok(true)`.
131 async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
132 Ok(true)
133 }
134
135 /// Best-effort synchronous shutdown signal, called from [`Drop`].
136 ///
137 /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
138 /// fire-and-forget — it cannot return errors. The default is a no-op;
139 /// implementations backed by a command channel should `try_send` a
140 /// shutdown command here.
141 fn try_shutdown_agent(&self, _agent_id: AgentId) {}
142}
143
144/// Handle to a running agent.
145///
146/// Wraps the agent's lifecycle: creation, chat, and shutdown.
147///
148/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
149/// If the handle is dropped without calling `shutdown()`, a best-effort
150/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
151/// will be cleaned up, but errors are only logged, not returned.
152///
153/// Most methods take `&self` — interior mutability is used where needed
154/// so multiple concurrent operations can share a single handle.
155///
156/// # Mutex choice
157///
158/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
159/// because every lock acquisition is a brief, synchronous operation (pointer
160/// swap or clone) that **never** spans an `.await` point. For these
161/// microsecond critical sections, `std::sync::Mutex` is both simpler and
162/// lower-overhead than the async alternative.
163pub struct AgentHandle<R: Runtime + 'static> {
164 id: AgentId,
165 runtime: Arc<R>,
166 config: AgentConfig,
167 /// Per-API-key quota state. Agents sharing the same effective API key
168 /// share backoff tracking; agents with different keys are independent.
169 quota_state: Arc<crate::quota::QuotaState>,
170 /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
171 /// entry isn't the only strong reference.
172 _registry: Option<Arc<crate::tools::ToolRegistry>>,
173 /// Kept alive to preserve a strong reference to the policy confirmation handler.
174 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
175 conversation_id: Arc<Mutex<Option<String>>>,
176 is_started: AtomicBool,
177 is_shutdown: AtomicBool,
178 /// Shared state from the last completed chat response, used to surface
179 /// `get_last_structured_output()` without round-tripping to Python.
180 ///
181 /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
182 /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
183 last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
184}
185
186impl<R: Runtime> AgentHandle<R> {
187 /// Create a new agent from the given runtime and configuration.
188 ///
189 /// This sends a `CreateAgent` command to the Python runtime, waits for
190 /// quota availability, and returns the handle.
191 ///
192 /// # Errors
193 ///
194 /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
195 /// Python error, or quota exceeded).
196 #[expect(
197 clippy::too_many_lines,
198 reason = "Agent initialization is a sequential multi-step setup"
199 )]
200 pub async fn new(
201 runtime: Arc<R>,
202 config: AgentConfig,
203 registry: Option<Arc<crate::tools::ToolRegistry>>,
204 hook_runner: Option<Arc<crate::hooks::Hooks>>,
205 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
206 ) -> Result<Self, Error> {
207 let quota_key = config.effective_api_key().unwrap_or_default();
208 let quota_state = runtime.quota_registry().state_for_key("a_key);
209
210 let mut config = config;
211 let has_session_start = config
212 .hooks
213 .iter()
214 .any(|h| h.point == crate::hooks::HookPoint::OnSessionStart);
215 if !has_session_start {
216 config.hooks.push(crate::hooks::HookEntry {
217 name: "builtin_session_sync".to_string(),
218 point: crate::hooks::HookPoint::OnSessionStart,
219 callback_id: "builtin_session_sync".to_string(),
220 });
221 }
222
223 // We must hold CREATE_AGENT_HOOK_GUARD across both agent creation and
224 // bridge_state insertion. This prevents a race where an asynchronous hook
225 // (like on_session_start) fires in Python after create_agent returns but
226 // before the agent is inserted into bridge_state.
227 let _guard = crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await;
228
229 let effective_hook_runner =
230 hook_runner.unwrap_or_else(|| Arc::new(crate::hooks::Hooks::new()));
231
232 match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
233 Ok(mut opt) => {
234 *opt = Some(Arc::clone(&effective_hook_runner));
235 }
236 Err(e) => {
237 return Err(Error::BackendError {
238 message: format!(
239 "INITIALIZING_HOOK_RUNNER mutex poisoned — hooks cannot be installed: {e}"
240 ),
241 });
242 }
243 }
244
245 let id = runtime.create_agent(config.clone()).await?;
246 tracing::info!(agent_id = id, "Agent created successfully");
247
248 // Build and insert per-agent bridge state in a single lock acquisition.
249 let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
250 let mut initial_conv_id = config.conversation_id.clone();
251 if initial_conv_id.is_none()
252 && let Ok(mut guard) = crate::runtime::PENDING_CONVERSATION_IDS.lock()
253 && let Some(pending_id) = guard.remove(&id)
254 {
255 initial_conv_id = Some(pending_id);
256 }
257 let conversation_id = Arc::new(Mutex::new(initial_conv_id));
258 let bridge_entry = crate::runtime::AgentBridgeState {
259 registry: registry.as_ref().map(Arc::clone),
260 hook_runner: Some(effective_hook_runner),
261 policies: policies_set,
262 policy_handler: policy_handler.as_ref().map(Arc::clone),
263 tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
264 conversation_id: Arc::clone(&conversation_id),
265 };
266 let bridge_insert_failed = match crate::runtime::bridge_state().write() {
267 Ok(mut map) => {
268 map.insert(id, bridge_entry);
269 false
270 }
271 Err(e) => {
272 tracing::error!(
273 agent_id = id,
274 error = %e,
275 "Failed to acquire write lock on BRIDGE_STATE — agent would be unusable"
276 );
277 true
278 }
279 };
280 // Guard is dropped here — safe to .await below.
281 if bridge_insert_failed {
282 // Best-effort shutdown the agent we just created before returning the error.
283 if let Err(shutdown_err) = runtime.shutdown_agent(id).await {
284 tracing::error!(
285 agent_id = id,
286 error = ?shutdown_err,
287 "Failed to shut down agent after BRIDGE_STATE lock failure"
288 );
289 }
290 return Err(Error::BackendError {
291 message: "BRIDGE_STATE RwLock poisoned during agent creation".to_string(),
292 });
293 }
294
295 match crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
296 Ok(mut opt) => {
297 *opt = None;
298 }
299 Err(e) => {
300 // The agent is already created at this point. Log at error
301 // level — a stale hook runner may cause the next agent
302 // creation to pick up the wrong hooks, but the current
303 // agent is functional.
304 tracing::error!(
305 agent_id = id,
306 error = %e,
307 "INITIALIZING_HOOK_RUNNER mutex poisoned during cleanup — \
308 stale hook runner may persist"
309 );
310 }
311 }
312
313 Ok(Self {
314 id,
315 runtime,
316 config,
317 quota_state,
318 _registry: registry,
319 policy_handler,
320 conversation_id,
321 is_started: AtomicBool::new(true),
322 is_shutdown: AtomicBool::new(false),
323 last_shared_state: Mutex::new(None),
324 })
325 }
326
327 /// Send a message and receive a streaming response.
328 ///
329 /// Accepts any type that converts into [`Content`]: `&str`, `String`,
330 /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
331 /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
332 /// `Vec<ContentPrimitive>` for multimodal input.
333 ///
334 /// Automatically backs off on quota limits (HTTP 429).
335 ///
336 /// # Errors
337 ///
338 /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
339 pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
340 if !self.is_started() {
341 return Err(Error::AgentNotStarted);
342 }
343
344 let content = content.into();
345 let max_retries = self.config.max_quota_retries.unwrap_or(0);
346
347 let handle = 'retry: {
348 for attempt in 0..=max_retries {
349 if attempt > 0 {
350 self.quota_state.wait_for_quota().await;
351 }
352 match self.runtime.chat(self.id, &content).await {
353 Ok(h) => break 'retry h,
354 Err(Error::QuotaExceeded { retry_after }) => {
355 self.handle_quota_error("chat", attempt, max_retries, retry_after)?;
356 }
357 Err(ref e) if e.is_quota_error() => {
358 self.handle_quota_error(
359 "chat",
360 attempt,
361 max_retries,
362 DEFAULT_QUOTA_BACKOFF,
363 )?;
364 }
365 Err(e) => return Err(e),
366 }
367 }
368 return Err(Error::QuotaExceeded {
369 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
370 });
371 };
372
373 match self.last_shared_state.lock() {
374 Ok(mut guard) => {
375 *guard = Some(Arc::clone(&handle.shared_state));
376 }
377 Err(e) => {
378 tracing::error!(
379 agent_id = self.id,
380 error = %e,
381 "last_shared_state mutex poisoned — streaming metadata may be stale"
382 );
383 }
384 }
385 Ok(handle)
386 }
387
388 /// Send a message and return the final text response.
389 ///
390 /// This is a convenience wrapper around [`chat`](Self::chat) that drains
391 /// the streaming response into a single `String`. If tools were associated
392 /// with the agent at creation time, the Python runtime handles tool
393 /// execution automatically.
394 ///
395 /// # Errors
396 ///
397 /// Returns [`Error`] if the chat turn fails or stream errors occur.
398 pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
399 let response = self.chat(message.into()).await?;
400 let text = response.text().await.map_err(|e| {
401 let converted = Error::from(e);
402 if matches!(converted, Error::Safety) {
403 converted
404 } else {
405 Error::BackendError {
406 message: format!("Failed to read response text: {converted}"),
407 }
408 }
409 })?;
410 Ok(text.into_string())
411 }
412
413 /// Return the current conversation ID, if one has been set.
414 ///
415 /// Returns a cloned `String` because the underlying value is behind a
416 /// [`Mutex`] (interior mutability for `&self` access).
417 #[must_use]
418 pub fn conversation_id(&self) -> Option<String> {
419 self.conversation_id
420 .lock()
421 .inspect_err(|e| {
422 tracing::error!(
423 agent_id = self.id,
424 error = %e,
425 "conversation_id mutex poisoned"
426 );
427 })
428 .ok()
429 .and_then(|guard| guard.clone())
430 }
431
432 /// Set the conversation ID (called when the SDK assigns one).
433 ///
434 /// Takes `&self` rather than `&mut self` so the handle can be shared
435 /// across concurrent tasks.
436 pub fn set_conversation_id(&self, id: String) {
437 match self.conversation_id.lock() {
438 Ok(mut guard) => {
439 *guard = Some(id);
440 }
441 Err(e) => {
442 tracing::error!(
443 agent_id = self.id,
444 error = %e,
445 "conversation_id mutex poisoned — ID will not be updated"
446 );
447 }
448 }
449 }
450
451 /// Check whether the agent has been started and is not yet shut down.
452 #[must_use]
453 pub fn is_started(&self) -> bool {
454 self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
455 }
456
457 /// Return the agent's unique identifier.
458 #[must_use]
459 pub const fn id(&self) -> AgentId {
460 self.id
461 }
462
463 /// Return a reference to the agent's configuration.
464 #[must_use]
465 pub const fn config(&self) -> &AgentConfig {
466 &self.config
467 }
468
469 /// Interrupt the active chat prompt execution.
470 ///
471 /// # Errors
472 ///
473 /// Returns a [`Error`] if the cancellation call fails.
474 pub async fn cancel(&self) -> Result<(), Error> {
475 self.runtime.cancel(self.id).await
476 }
477
478 /// Wait for the conversation or active run to stabilize and become idle.
479 ///
480 /// # Errors
481 ///
482 /// Returns a [`Error`] if the wait call fails.
483 pub async fn wait_for_idle(&self) -> Result<(), Error> {
484 self.runtime.wait_for_idle(self.id).await
485 }
486
487 /// Retrieve the conversation's message history.
488 ///
489 /// # Errors
490 ///
491 /// Returns [`Error`] if the query fails.
492 pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
493 self.runtime.history(self.id).await
494 }
495
496 /// Return the number of completed turns in the conversation.
497 ///
498 /// # Errors
499 ///
500 /// Returns [`Error`] if the query fails.
501 pub async fn turn_count(&self) -> Result<u32, Error> {
502 self.runtime.turn_count(self.id).await
503 }
504
505 /// Return cumulative token usage across all turns.
506 ///
507 /// # Errors
508 ///
509 /// Returns [`Error`] if the query fails.
510 pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
511 self.runtime.total_usage(self.id).await
512 }
513
514 /// Return token usage from the most recent turn only.
515 ///
516 /// # Errors
517 ///
518 /// Returns [`Error`] if the query fails.
519 pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
520 self.runtime.last_turn_usage(self.id).await
521 }
522
523 /// Clear the conversation history and reset state.
524 ///
525 /// # Errors
526 ///
527 /// Returns [`Error`] if the operation fails.
528 pub async fn clear_history(&self) -> Result<(), Error> {
529 self.runtime.clear_history(self.id).await
530 }
531
532 /// Return the text of the last model response, if any.
533 ///
534 /// # Errors
535 ///
536 /// Returns [`Error`] if the query fails.
537 pub async fn last_response(&self) -> Result<Option<String>, Error> {
538 self.runtime.last_response(self.id).await
539 }
540
541 /// Return the step indices at which conversation compaction occurred.
542 ///
543 /// # Errors
544 ///
545 /// Returns [`Error`] if the query fails.
546 pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
547 self.runtime.compaction_indices(self.id).await
548 }
549
550 /// Delete the conversation and all associated state.
551 ///
552 /// After calling this method, the agent handle is no longer usable
553 /// for chat operations. This also marks the agent as shut down.
554 ///
555 /// # Errors
556 ///
557 /// Returns [`Error`] if the delete operation fails.
558 pub async fn delete(&self) -> Result<(), Error> {
559 let result = self.runtime.delete(self.id).await;
560 self.is_shutdown.store(true, Ordering::SeqCst);
561 result
562 }
563
564 /// Disconnect from the agent without deleting its state.
565 ///
566 /// The agent's conversation state is preserved but this handle
567 /// can no longer send messages. Marks the agent as shut down.
568 ///
569 /// # Errors
570 ///
571 /// Returns [`Error`] if the disconnect operation fails.
572 pub async fn disconnect(&self) -> Result<(), Error> {
573 let result = self.runtime.disconnect(self.id).await;
574 self.is_shutdown.store(true, Ordering::SeqCst);
575 result
576 }
577
578 /// Check whether the agent is currently idle (not running a turn).
579 ///
580 /// # Errors
581 ///
582 /// Returns [`Error`] if the query fails.
583 pub async fn is_idle(&self) -> Result<bool, Error> {
584 self.runtime.is_idle(self.id).await
585 }
586
587 /// Return the structured output from the last chat response, if any.
588 ///
589 /// Only populated after a [`chat()`](Self::chat) round-trip when the
590 /// agent was configured with a `response_schema` and the model returned
591 /// a valid JSON payload.
592 #[must_use]
593 pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
594 let guard = self
595 .last_shared_state
596 .lock()
597 .inspect_err(|e| {
598 tracing::error!(
599 agent_id = self.id,
600 error = %e,
601 "last_shared_state mutex poisoned in get_last_structured_output"
602 );
603 })
604 .ok()?;
605 let state = guard
606 .as_ref()?
607 .lock()
608 .inspect_err(|e| {
609 tracing::error!(
610 agent_id = self.id,
611 error = %e,
612 "ChatResponseSharedState mutex poisoned in get_last_structured_output"
613 );
614 })
615 .ok()?;
616 state.structured_output.clone()
617 }
618
619 /// Return the structured output from the last chat response deserialized into `T`.
620 ///
621 /// Returns `None` if there was no structured output on the last response.
622 /// Returns `Some(Err(...))` if the structured output could not be deserialized as `T`.
623 pub fn get_last_structured_output_as<T: serde::de::DeserializeOwned>(
624 &self,
625 ) -> Option<Result<T, serde_json::Error>> {
626 self.get_last_structured_output()
627 .map(serde_json::from_value)
628 }
629
630 /// Return the usage metadata from the last chat response, if any.
631 #[must_use]
632 pub fn get_last_usage(&self) -> Option<UsageMetadata> {
633 let guard = self
634 .last_shared_state
635 .lock()
636 .inspect_err(|e| {
637 tracing::error!(
638 agent_id = self.id,
639 error = %e,
640 "last_shared_state mutex poisoned in get_last_usage"
641 );
642 })
643 .ok()?;
644 let state = guard
645 .as_ref()?
646 .lock()
647 .inspect_err(|e| {
648 tracing::error!(
649 agent_id = self.id,
650 error = %e,
651 "ChatResponseSharedState mutex poisoned in get_last_usage"
652 );
653 })
654 .ok()?;
655 state.usage.clone()
656 }
657
658 /// Send a message without waiting for a response.
659 ///
660 /// Fire-and-forget: the message is delivered to the agent but no
661 /// streaming response is produced.
662 ///
663 /// # Errors
664 ///
665 /// Returns a [`Error`] if sending fails.
666 pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
667 if !self.is_started() {
668 return Err(Error::AgentNotStarted);
669 }
670
671 let content = content.into();
672
673 let max_retries = self.config.max_quota_retries.unwrap_or(0);
674
675 for attempt in 0..=max_retries {
676 if attempt > 0 {
677 self.quota_state.wait_for_quota().await;
678 }
679 match self.runtime.send(self.id, &content).await {
680 Ok(()) => return Ok(()),
681 Err(Error::QuotaExceeded { retry_after }) => {
682 self.handle_quota_error("send", attempt, max_retries, retry_after)?;
683 }
684 Err(ref e) if e.is_quota_error() => {
685 self.handle_quota_error("send", attempt, max_retries, DEFAULT_QUOTA_BACKOFF)?;
686 }
687 Err(e) => return Err(e),
688 }
689 }
690 Err(Error::QuotaExceeded {
691 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
692 })
693 }
694
695 /// Signal that this agent is idle and ready to receive input.
696 ///
697 /// # Errors
698 ///
699 /// Returns a [`Error`] if the signal call fails.
700 pub async fn signal_idle(&self) -> Result<(), Error> {
701 self.runtime.signal_idle(self.id).await
702 }
703
704 /// Wait for the agent to wake up, returning `true` if woken or
705 /// `false` if the `timeout` elapsed.
706 ///
707 /// # Errors
708 ///
709 /// Returns a [`Error`] if the wait call fails.
710 pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
711 self.runtime.wait_for_wakeup(self.id, timeout).await
712 }
713
714 /// Handle a quota/429 error from a retryable operation.
715 fn handle_quota_error(
716 &self,
717 operation: &str,
718 attempt: u32,
719 max_retries: u32,
720 retry_after: std::time::Duration,
721 ) -> Result<(), Error> {
722 if attempt >= max_retries {
723 return Err(Error::QuotaExceeded { retry_after });
724 }
725 tracing::warn!(
726 agent_id = self.id,
727 attempt = attempt + 1,
728 max = max_retries,
729 retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
730 tracing::warn!("Int conversion failed: {e}");
731 u64::MAX
732 }),
733 "Quota exceeded on {operation} — recording hit and retrying"
734 );
735 self.quota_state.record_quota_hit(retry_after);
736 Ok(())
737 }
738
739 /// Gracefully shut down the agent.
740 ///
741 /// This sends a `ShutdownAgent` command to the Python runtime, which
742 /// calls `__aexit__()` on the SDK agent. The handle remains usable
743 /// for read-only queries (e.g. [`is_started()`](Self::is_started))
744 /// after shutdown.
745 ///
746 /// # Errors
747 ///
748 /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
749 /// flag is always set so the `Drop` impl will not emit a warning.
750 pub async fn shutdown(&self) -> Result<(), Error> {
751 if self.is_shutdown.load(Ordering::SeqCst) {
752 tracing::debug!(agent_id = self.id, "Agent already shut down");
753 return Ok(());
754 }
755
756 tracing::info!(agent_id = self.id, "Shutting down agent");
757 let result = self.runtime.shutdown_agent(self.id).await;
758
759 // Always mark as shut down so Drop doesn't warn, even on failure.
760 self.is_shutdown.store(true, Ordering::SeqCst);
761
762 // Clean up bridge state AFTER the runtime's shutdown completes.
763 // In the live runtime, `__aexit__` fires hooks (e.g. on_session_end)
764 // that look up bridge state — so this must happen after, not before.
765 match crate::runtime::bridge_state().write() {
766 Ok(mut map) => {
767 map.remove(&self.id);
768 }
769 Err(e) => {
770 tracing::error!(
771 agent_id = self.id,
772 error = %e,
773 "BRIDGE_STATE RwLock poisoned during shutdown cleanup — \
774 bridge state entry may leak"
775 );
776 }
777 }
778
779 match result {
780 Ok(()) => {
781 tracing::info!(agent_id = self.id, "Agent shut down successfully");
782 }
783 Err(ref e) => {
784 tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
785 }
786 }
787
788 result
789 }
790
791 /// Spawn a subagent from the given config, sharing this agent's runtime.
792 ///
793 /// If a `ToolRegistry` is provided and `config.tools` is empty, the
794 /// registry's definitions are automatically applied.
795 ///
796 /// # Errors
797 ///
798 /// Returns a [`Error`] if agent creation fails.
799 pub async fn spawn_subagent(
800 &self,
801 mut config: AgentConfig,
802 registry: impl Into<Option<crate::tools::ToolRegistry>>,
803 ) -> Result<Self, Error> {
804 let opt_registry = registry.into();
805 if let Some(disp) = &opt_registry
806 && config.tools.is_empty()
807 {
808 config.tools = disp.definitions();
809 }
810 let arc_registry = opt_registry.map(Arc::new);
811 Self::new(
812 Arc::clone(&self.runtime),
813 config,
814 arc_registry,
815 None,
816 self.policy_handler.clone(),
817 )
818 .await
819 }
820}
821
822impl<R: Runtime> Drop for AgentHandle<R> {
823 fn drop(&mut self) {
824 if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
825 tracing::debug!(
826 agent_id = self.id,
827 "AgentHandle dropped without explicit shutdown() — \
828 sending best-effort shutdown signal"
829 );
830 // try_shutdown_agent fires a command that eventually calls
831 // handle_shutdown_agent, which cleans up bridge state AFTER
832 // __aexit__ completes (so on_session_end hooks can still
833 // find the hook runner). Do NOT clean up bridge state here.
834 self.runtime.try_shutdown_agent(self.id);
835 } else if self.is_shutdown.load(Ordering::SeqCst) {
836 // shutdown() was already called — handle_shutdown_agent
837 // already cleaned up bridge state after __aexit__. Nothing
838 // to do.
839 } else {
840 // Agent was never started (e.g. creation failed). Clean up
841 // any partial bridge state that might have been registered.
842 if let Ok(mut map) = crate::runtime::bridge_state().write() {
843 map.remove(&self.id);
844 } else {
845 tracing::error!(
846 agent_id = self.id,
847 "BRIDGE_STATE RwLock poisoned during Drop — \
848 bridge state entry for this agent may leak"
849 );
850 }
851 }
852 }
853}