agy_bridge/agent/mod.rs
1//! Agent lifecycle management for the Antigravity SDK bridge.
2//!
3//! Provides [`AgentHandle`](crate::agent::AgentHandle) which wraps the lifecycle of a single SDK agent:
4//! creation, chatting, conversation tracking, and shutdown with RAII warnings.
5
6use std::sync::{
7 Arc, Mutex,
8 atomic::{AtomicBool, Ordering},
9};
10
11use crate::{
12 config::AgentConfig,
13 content::Content,
14 error::Error,
15 streaming::{ChatResponseHandle, ChatResponseSharedState},
16 types::{ConversationMessage, UsageMetadata},
17};
18
19/// Default backoff duration when a quota/429 error doesn't include a
20/// `Retry-After` header.
21const DEFAULT_QUOTA_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
22
23/// Duration reported to the caller when all quota retries are exhausted.
24const QUOTA_EXHAUSTED_RETRY_AFTER: std::time::Duration = std::time::Duration::from_mins(2);
25
26#[cfg(test)]
27pub(crate) mod mock;
28
29/// Unique identifier for an agent within the bridge.
30pub type AgentId = u64;
31
32/// Trait abstracting the Python runtime interface.
33///
34/// This allows unit tests to inject a mock runtime without requiring a live
35/// Python interpreter. The real implementation will call through to `PyO3`.
36#[expect(
37 async_fn_in_trait,
38 reason = "Runtime is not object-safe by design; callers always know the concrete type"
39)]
40pub trait Runtime: Send + Sync {
41 /// Create an agent from the given config, returning its ID.
42 async fn create_agent(&self, config: AgentConfig) -> Result<AgentId, Error>;
43
44 /// Send a chat message to the agent, returning a streaming response handle.
45 ///
46 /// The `content` parameter accepts any [`Content`] variant: plain text,
47 /// images, documents, audio, video, or a multi-part list.
48 async fn chat(&self, agent_id: AgentId, content: &Content)
49 -> Result<ChatResponseHandle, Error>;
50
51 /// Gracefully shut down the agent.
52 async fn shutdown_agent(&self, agent_id: AgentId) -> Result<(), Error>;
53
54 /// Interrupt any active prompt/chat run.
55 async fn cancel(&self, agent_id: AgentId) -> Result<(), Error>;
56
57 /// Wait for the active run or conversational loop to stabilize.
58 async fn wait_for_idle(&self, agent_id: AgentId) -> Result<(), Error>;
59
60 /// Send a message without waiting for completion.
61 async fn send(&self, agent_id: AgentId, content: &Content) -> Result<(), Error>;
62
63 /// Signal that the agent is idle.
64 async fn signal_idle(&self, agent_id: AgentId) -> Result<(), Error>;
65
66 /// Wait for the agent to wake up. Returns true if woken, false if timed out.
67 async fn wait_for_wakeup(
68 &self,
69 agent_id: AgentId,
70 timeout: std::time::Duration,
71 ) -> Result<bool, Error>;
72
73 /// Wait if we're in a quota backoff period.
74 async fn wait_for_quota(&self);
75
76 /// Record a quota hit with the suggested retry duration.
77 async fn record_quota_hit(&self, retry_after: std::time::Duration);
78
79 /// Access this runtime's per-key quota registry.
80 ///
81 /// Each runtime owns its own [`QuotaRegistry`](crate::quota::QuotaRegistry),
82 /// so different runtimes have fully independent quota tracking.
83 fn quota_registry(&self) -> &crate::quota::QuotaRegistry;
84
85 /// Retrieve the conversation's message history.
86 async fn history(&self, agent_id: AgentId) -> Result<Vec<ConversationMessage>, Error>;
87
88 /// Return the number of completed turns in the conversation.
89 async fn turn_count(&self, agent_id: AgentId) -> Result<u32, Error>;
90
91 /// Return cumulative token usage across all turns.
92 async fn total_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
93
94 /// Return token usage from the most recent turn only.
95 async fn last_turn_usage(&self, agent_id: AgentId) -> Result<UsageMetadata, Error>;
96
97 /// Clear the conversation history and reset state.
98 async fn clear_history(&self, agent_id: AgentId) -> Result<(), Error>;
99
100 /// Return the text of the last model response, if any.
101 ///
102 /// Default implementation returns `Ok(None)`.
103 async fn last_response(&self, _agent_id: AgentId) -> Result<Option<String>, Error> {
104 Ok(None)
105 }
106
107 /// Return the step indices at which compaction occurred.
108 ///
109 /// Default implementation returns an empty list.
110 async fn compaction_indices(&self, _agent_id: AgentId) -> Result<Vec<u32>, Error> {
111 Ok(Vec::new())
112 }
113
114 /// Delete the conversation and all associated state.
115 ///
116 /// Default implementation is a no-op that returns `Ok(())`.
117 async fn delete(&self, _agent_id: AgentId) -> Result<(), Error> {
118 Ok(())
119 }
120
121 /// Disconnect from the agent without deleting state.
122 ///
123 /// Default implementation is a no-op that returns `Ok(())`.
124 async fn disconnect(&self, _agent_id: AgentId) -> Result<(), Error> {
125 Ok(())
126 }
127
128 /// Check whether the agent is currently idle (not running a turn).
129 ///
130 /// Default implementation returns `Ok(true)`.
131 async fn is_idle(&self, _agent_id: AgentId) -> Result<bool, Error> {
132 Ok(true)
133 }
134
135 /// Best-effort synchronous shutdown signal, called from [`Drop`].
136 ///
137 /// Unlike [`shutdown_agent`](Self::shutdown_agent), this is sync and
138 /// fire-and-forget — it cannot return errors. The default is a no-op;
139 /// implementations backed by a command channel should `try_send` a
140 /// shutdown command here.
141 fn try_shutdown_agent(&self, _agent_id: AgentId) {}
142}
143
144/// Handle to a running agent.
145///
146/// Wraps the agent's lifecycle: creation, chat, and shutdown.
147///
148/// Call [`shutdown()`](Self::shutdown) for a clean, error-reported shutdown.
149/// If the handle is dropped without calling `shutdown()`, a best-effort
150/// background shutdown is spawned via [`tokio::spawn`] — the Python agent
151/// will be cleaned up, but errors are only logged, not returned.
152///
153/// Most methods take `&self` — interior mutability is used where needed
154/// so multiple concurrent operations can share a single handle.
155///
156/// # Mutex choice
157///
158/// This type uses [`std::sync::Mutex`] rather than [`tokio::sync::Mutex`]
159/// because every lock acquisition is a brief, synchronous operation (pointer
160/// swap or clone) that **never** spans an `.await` point. For these
161/// microsecond critical sections, `std::sync::Mutex` is both simpler and
162/// lower-overhead than the async alternative.
163pub struct AgentHandle<R: Runtime + 'static> {
164 id: AgentId,
165 runtime: Arc<R>,
166 config: AgentConfig,
167 /// Per-API-key quota state. Agents sharing the same effective API key
168 /// share backoff tracking; agents with different keys are independent.
169 quota_state: Arc<crate::quota::QuotaState>,
170 /// Kept alive for the agent's lifetime so the global `BRIDGE_STATE`
171 /// entry isn't the only strong reference.
172 _registry: Option<Arc<crate::tools::ToolRegistry>>,
173 /// Kept alive to preserve a strong reference to the policy confirmation handler.
174 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
175 conversation_id: Mutex<Option<String>>,
176 is_started: AtomicBool,
177 is_shutdown: AtomicBool,
178 /// Shared state from the last completed chat response, used to surface
179 /// `get_last_structured_output()` without round-tripping to Python.
180 ///
181 /// Wrapped in a `Mutex` so `chat()` can take `&self` instead of `&mut self`,
182 /// enabling concurrent usage patterns. The lock is brief (pointer swap only).
183 last_shared_state: Mutex<Option<Arc<Mutex<ChatResponseSharedState>>>>,
184}
185
186impl<R: Runtime> AgentHandle<R> {
187 /// Create a new agent from the given runtime and configuration.
188 ///
189 /// This sends a `CreateAgent` command to the Python runtime, waits for
190 /// quota availability, and returns the handle.
191 ///
192 /// # Errors
193 ///
194 /// Returns a [`Error`] if agent creation fails (e.g. invalid config,
195 /// Python error, or quota exceeded).
196 pub async fn new(
197 runtime: Arc<R>,
198 config: AgentConfig,
199 registry: Option<Arc<crate::tools::ToolRegistry>>,
200 hook_runner: Option<Arc<crate::hooks::Hooks>>,
201 policy_handler: Option<Arc<dyn crate::policies::AskUserHandler>>,
202 ) -> Result<Self, Error> {
203 let quota_key = config.effective_api_key().unwrap_or_default();
204 let quota_state = runtime.quota_registry().state_for_key("a_key);
205 let id = if hook_runner.is_some() {
206 // Serialize the set→create→clear sequence so concurrent creates
207 // cannot overwrite each other's temporary hook runner.
208 //
209 // NOTE: These must remain process-global because the Python-side
210 // callback (`dispatch_rust_hook`) is itself process-global — it is
211 // registered in `sys.modules["_agy_bridge_globals"]` and has no way
212 // to identify which runtime instance triggered it. A per-runtime
213 // guard would not prevent cross-runtime races on the shared
214 // INITIALIZING_HOOK_RUNNER slot.
215 let _guard = crate::runtime::CREATE_AGENT_HOOK_GUARD.lock().await;
216 if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
217 *opt = hook_runner.as_ref().map(Arc::clone);
218 } else {
219 tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — hook may not fire");
220 }
221 let result = runtime.create_agent(config.clone()).await;
222 if let Ok(mut opt) = crate::runtime::INITIALIZING_HOOK_RUNNER.lock() {
223 *opt = None;
224 } else {
225 tracing::error!("INITIALIZING_HOOK_RUNNER mutex poisoned — stale hook may persist");
226 }
227 result?
228 } else {
229 runtime.create_agent(config.clone()).await?
230 };
231
232 tracing::info!(agent_id = id, "Agent created successfully");
233
234 // Build and insert per-agent bridge state in a single lock acquisition.
235 let policies_set = crate::policies::PolicySet::validated_from(config.policies.clone())?;
236 let bridge_entry = crate::runtime::AgentBridgeState {
237 registry: registry.as_ref().map(Arc::clone),
238 hook_runner: hook_runner.as_ref().map(Arc::clone),
239 policies: policies_set,
240 policy_handler: policy_handler.as_ref().map(Arc::clone),
241 tool_state: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
242 };
243 if let Ok(mut map) = crate::runtime::bridge_state().write() {
244 map.insert(id, bridge_entry);
245 } else {
246 tracing::error!(
247 agent_id = id,
248 "Failed to acquire write lock on BRIDGE_STATE"
249 );
250 }
251
252 let conversation_id = Mutex::new(config.conversation_id.clone());
253 Ok(Self {
254 id,
255 runtime,
256 config,
257 quota_state,
258 _registry: registry,
259 policy_handler,
260 conversation_id,
261 is_started: AtomicBool::new(true),
262 is_shutdown: AtomicBool::new(false),
263 last_shared_state: Mutex::new(None),
264 })
265 }
266
267 /// Send a message and receive a streaming response.
268 ///
269 /// Accepts any type that converts into [`Content`]: `&str`, `String`,
270 /// [`Image`](crate::content::Image), [`Document`](crate::content::Document),
271 /// [`Audio`](crate::content::Audio), [`Video`](crate::content::Video), or a
272 /// `Vec<ContentPrimitive>` for multimodal input.
273 ///
274 /// Automatically backs off on quota limits (HTTP 429).
275 ///
276 /// # Errors
277 ///
278 /// Returns a [`Error`] on chat failure (Python error, timeout, etc.).
279 pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponseHandle, Error> {
280 if !self.is_started() {
281 return Err(Error::AgentNotStarted);
282 }
283
284 let content = content.into();
285 let max_retries = self.config.max_quota_retries.unwrap_or(0);
286
287 let handle = 'retry: {
288 for attempt in 0..=max_retries {
289 if attempt > 0 {
290 self.quota_state.wait_for_quota().await;
291 }
292 match self.runtime.chat(self.id, &content).await {
293 Ok(h) => break 'retry h,
294 Err(Error::QuotaExceeded { retry_after }) => {
295 self.handle_quota_error("chat", attempt, max_retries, retry_after)?;
296 }
297 Err(ref e) if e.is_quota_error() => {
298 self.handle_quota_error(
299 "chat",
300 attempt,
301 max_retries,
302 DEFAULT_QUOTA_BACKOFF,
303 )?;
304 }
305 Err(e) => return Err(e),
306 }
307 }
308 return Err(Error::QuotaExceeded {
309 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
310 });
311 };
312
313 if let Ok(mut guard) = self.last_shared_state.lock() {
314 *guard = Some(Arc::clone(&handle.shared_state));
315 } else {
316 tracing::error!("last_shared_state mutex poisoned — streaming metadata may be stale");
317 }
318 Ok(handle)
319 }
320
321 /// Send a message and return the final text response.
322 ///
323 /// This is a convenience wrapper around [`chat`](Self::chat) that drains
324 /// the streaming response into a single `String`. If tools were associated
325 /// with the agent at creation time, the Python runtime handles tool
326 /// execution automatically.
327 ///
328 /// # Errors
329 ///
330 /// Returns [`Error`] if the chat turn fails or stream errors occur.
331 pub async fn chat_text(&self, message: impl Into<Content>) -> Result<String, Error> {
332 let response = self.chat(message.into()).await?;
333 let text = response.text().await.map_err(|e| {
334 let converted = Error::from(e);
335 if matches!(converted, Error::Safety) {
336 converted
337 } else {
338 Error::BackendError {
339 message: format!("Failed to read response text: {converted}"),
340 }
341 }
342 })?;
343 Ok(text.into_string())
344 }
345
346 /// Return the current conversation ID, if one has been set.
347 ///
348 /// Returns a cloned `String` because the underlying value is behind a
349 /// [`Mutex`] (interior mutability for `&self` access).
350 #[must_use]
351 pub fn conversation_id(&self) -> Option<String> {
352 self.conversation_id
353 .lock()
354 .inspect_err(|e| {
355 tracing::error!(
356 agent_id = self.id,
357 error = %e,
358 "conversation_id mutex poisoned"
359 );
360 })
361 .ok()
362 .and_then(|guard| guard.clone())
363 }
364
365 /// Set the conversation ID (called when the SDK assigns one).
366 ///
367 /// Takes `&self` rather than `&mut self` so the handle can be shared
368 /// across concurrent tasks.
369 pub fn set_conversation_id(&self, id: String) {
370 if let Ok(mut guard) = self.conversation_id.lock() {
371 *guard = Some(id);
372 } else {
373 tracing::error!("Failed to acquire lock on conversation_id");
374 }
375 }
376
377 /// Check whether the agent has been started and is not yet shut down.
378 #[must_use]
379 pub fn is_started(&self) -> bool {
380 self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst)
381 }
382
383 /// Return the agent's unique identifier.
384 #[must_use]
385 pub const fn id(&self) -> AgentId {
386 self.id
387 }
388
389 /// Return a reference to the agent's configuration.
390 #[must_use]
391 pub const fn config(&self) -> &AgentConfig {
392 &self.config
393 }
394
395 /// Interrupt the active chat prompt execution.
396 ///
397 /// # Errors
398 ///
399 /// Returns a [`Error`] if the cancellation call fails.
400 pub async fn cancel(&self) -> Result<(), Error> {
401 self.runtime.cancel(self.id).await
402 }
403
404 /// Wait for the conversation or active run to stabilize and become idle.
405 ///
406 /// # Errors
407 ///
408 /// Returns a [`Error`] if the wait call fails.
409 pub async fn wait_for_idle(&self) -> Result<(), Error> {
410 self.runtime.wait_for_idle(self.id).await
411 }
412
413 /// Retrieve the conversation's message history.
414 ///
415 /// # Errors
416 ///
417 /// Returns [`Error`] if the query fails.
418 pub async fn history(&self) -> Result<Vec<ConversationMessage>, Error> {
419 self.runtime.history(self.id).await
420 }
421
422 /// Return the number of completed turns in the conversation.
423 ///
424 /// # Errors
425 ///
426 /// Returns [`Error`] if the query fails.
427 pub async fn turn_count(&self) -> Result<u32, Error> {
428 self.runtime.turn_count(self.id).await
429 }
430
431 /// Return cumulative token usage across all turns.
432 ///
433 /// # Errors
434 ///
435 /// Returns [`Error`] if the query fails.
436 pub async fn total_usage(&self) -> Result<UsageMetadata, Error> {
437 self.runtime.total_usage(self.id).await
438 }
439
440 /// Return token usage from the most recent turn only.
441 ///
442 /// # Errors
443 ///
444 /// Returns [`Error`] if the query fails.
445 pub async fn last_turn_usage(&self) -> Result<UsageMetadata, Error> {
446 self.runtime.last_turn_usage(self.id).await
447 }
448
449 /// Clear the conversation history and reset state.
450 ///
451 /// # Errors
452 ///
453 /// Returns [`Error`] if the operation fails.
454 pub async fn clear_history(&self) -> Result<(), Error> {
455 self.runtime.clear_history(self.id).await
456 }
457
458 /// Return the text of the last model response, if any.
459 ///
460 /// # Errors
461 ///
462 /// Returns [`Error`] if the query fails.
463 pub async fn last_response(&self) -> Result<Option<String>, Error> {
464 self.runtime.last_response(self.id).await
465 }
466
467 /// Return the step indices at which conversation compaction occurred.
468 ///
469 /// # Errors
470 ///
471 /// Returns [`Error`] if the query fails.
472 pub async fn compaction_indices(&self) -> Result<Vec<u32>, Error> {
473 self.runtime.compaction_indices(self.id).await
474 }
475
476 /// Delete the conversation and all associated state.
477 ///
478 /// After calling this method, the agent handle is no longer usable
479 /// for chat operations. This also marks the agent as shut down.
480 ///
481 /// # Errors
482 ///
483 /// Returns [`Error`] if the delete operation fails.
484 pub async fn delete(&self) -> Result<(), Error> {
485 let result = self.runtime.delete(self.id).await;
486 self.is_shutdown.store(true, Ordering::SeqCst);
487 result
488 }
489
490 /// Disconnect from the agent without deleting its state.
491 ///
492 /// The agent's conversation state is preserved but this handle
493 /// can no longer send messages. Marks the agent as shut down.
494 ///
495 /// # Errors
496 ///
497 /// Returns [`Error`] if the disconnect operation fails.
498 pub async fn disconnect(&self) -> Result<(), Error> {
499 let result = self.runtime.disconnect(self.id).await;
500 self.is_shutdown.store(true, Ordering::SeqCst);
501 result
502 }
503
504 /// Check whether the agent is currently idle (not running a turn).
505 ///
506 /// # Errors
507 ///
508 /// Returns [`Error`] if the query fails.
509 pub async fn is_idle(&self) -> Result<bool, Error> {
510 self.runtime.is_idle(self.id).await
511 }
512
513 /// Return the structured output from the last chat response, if any.
514 ///
515 /// Only populated after a [`chat()`](Self::chat) round-trip when the
516 /// agent was configured with a `response_schema` and the model returned
517 /// a valid JSON payload.
518 #[must_use]
519 pub fn get_last_structured_output(&self) -> Option<serde_json::Value> {
520 let guard = self
521 .last_shared_state
522 .lock()
523 .inspect_err(|e| {
524 tracing::error!(
525 agent_id = self.id,
526 error = %e,
527 "last_shared_state mutex poisoned in get_last_structured_output"
528 );
529 })
530 .ok()?;
531 let state = guard
532 .as_ref()?
533 .lock()
534 .inspect_err(|e| {
535 tracing::error!(
536 agent_id = self.id,
537 error = %e,
538 "ChatResponseSharedState mutex poisoned in get_last_structured_output"
539 );
540 })
541 .ok()?;
542 state.structured_output.clone()
543 }
544
545 /// Return the usage metadata from the last chat response, if any.
546 #[must_use]
547 pub fn get_last_usage(&self) -> Option<UsageMetadata> {
548 let guard = self
549 .last_shared_state
550 .lock()
551 .inspect_err(|e| {
552 tracing::error!(
553 agent_id = self.id,
554 error = %e,
555 "last_shared_state mutex poisoned in get_last_usage"
556 );
557 })
558 .ok()?;
559 let state = guard
560 .as_ref()?
561 .lock()
562 .inspect_err(|e| {
563 tracing::error!(
564 agent_id = self.id,
565 error = %e,
566 "ChatResponseSharedState mutex poisoned in get_last_usage"
567 );
568 })
569 .ok()?;
570 state.usage.clone()
571 }
572
573 /// Send a message without waiting for a response.
574 ///
575 /// Fire-and-forget: the message is delivered to the agent but no
576 /// streaming response is produced.
577 ///
578 /// # Errors
579 ///
580 /// Returns a [`Error`] if sending fails.
581 pub async fn send(&self, content: impl Into<Content>) -> Result<(), Error> {
582 if !self.is_started() {
583 return Err(Error::AgentNotStarted);
584 }
585
586 let content = content.into();
587
588 let max_retries = self.config.max_quota_retries.unwrap_or(0);
589
590 for attempt in 0..=max_retries {
591 if attempt > 0 {
592 self.quota_state.wait_for_quota().await;
593 }
594 match self.runtime.send(self.id, &content).await {
595 Ok(()) => return Ok(()),
596 Err(Error::QuotaExceeded { retry_after }) => {
597 self.handle_quota_error("send", attempt, max_retries, retry_after)?;
598 }
599 Err(ref e) if e.is_quota_error() => {
600 self.handle_quota_error("send", attempt, max_retries, DEFAULT_QUOTA_BACKOFF)?;
601 }
602 Err(e) => return Err(e),
603 }
604 }
605 Err(Error::QuotaExceeded {
606 retry_after: QUOTA_EXHAUSTED_RETRY_AFTER,
607 })
608 }
609
610 /// Signal that this agent is idle and ready to receive input.
611 ///
612 /// # Errors
613 ///
614 /// Returns a [`Error`] if the signal call fails.
615 pub async fn signal_idle(&self) -> Result<(), Error> {
616 self.runtime.signal_idle(self.id).await
617 }
618
619 /// Wait for the agent to wake up, returning `true` if woken or
620 /// `false` if the `timeout` elapsed.
621 ///
622 /// # Errors
623 ///
624 /// Returns a [`Error`] if the wait call fails.
625 pub async fn wait_for_wakeup(&self, timeout: std::time::Duration) -> Result<bool, Error> {
626 self.runtime.wait_for_wakeup(self.id, timeout).await
627 }
628
629 /// Handle a quota/429 error from a retryable operation.
630 fn handle_quota_error(
631 &self,
632 operation: &str,
633 attempt: u32,
634 max_retries: u32,
635 retry_after: std::time::Duration,
636 ) -> Result<(), Error> {
637 if attempt >= max_retries {
638 return Err(Error::QuotaExceeded { retry_after });
639 }
640 tracing::warn!(
641 agent_id = self.id,
642 attempt = attempt + 1,
643 max = max_retries,
644 retry_after_ms = u64::try_from(retry_after.as_millis()).unwrap_or_else(|e| {
645 tracing::warn!("Int conversion failed: {e}");
646 u64::MAX
647 }),
648 "Quota exceeded on {operation} — recording hit and retrying"
649 );
650 self.quota_state.record_quota_hit(retry_after);
651 Ok(())
652 }
653
654 /// Gracefully shut down the agent.
655 ///
656 /// This sends a `ShutdownAgent` command to the Python runtime, which
657 /// calls `__aexit__()` on the SDK agent. The handle remains usable
658 /// for read-only queries (e.g. [`is_started()`](Self::is_started))
659 /// after shutdown.
660 ///
661 /// # Errors
662 ///
663 /// Returns a [`Error`] if shutdown fails. The `is_shutdown`
664 /// flag is always set so the `Drop` impl will not emit a warning.
665 pub async fn shutdown(&self) -> Result<(), Error> {
666 if self.is_shutdown.load(Ordering::SeqCst) {
667 tracing::debug!(agent_id = self.id, "Agent already shut down");
668 return Ok(());
669 }
670
671 tracing::info!(agent_id = self.id, "Shutting down agent");
672 let result = self.runtime.shutdown_agent(self.id).await;
673
674 // Always mark as shut down so Drop doesn't warn, even on failure.
675 self.is_shutdown.store(true, Ordering::SeqCst);
676
677 match result {
678 Ok(()) => {
679 tracing::info!(agent_id = self.id, "Agent shut down successfully");
680 }
681 Err(ref e) => {
682 tracing::error!(agent_id = self.id, error = ?e, "Agent shutdown failed");
683 }
684 }
685
686 result
687 }
688
689 /// Spawn a subagent from the given config, sharing this agent's runtime.
690 ///
691 /// If a `ToolRegistry` is provided and `config.tools` is empty, the
692 /// registry's definitions are automatically applied.
693 ///
694 /// # Errors
695 ///
696 /// Returns a [`Error`] if agent creation fails.
697 pub async fn spawn_subagent(
698 &self,
699 mut config: AgentConfig,
700 registry: impl Into<Option<crate::tools::ToolRegistry>>,
701 ) -> Result<Self, Error> {
702 let opt_registry = registry.into();
703 if let Some(disp) = &opt_registry
704 && config.tools.is_empty()
705 {
706 config.tools = disp.definitions();
707 }
708 let arc_registry = opt_registry.map(Arc::new);
709 Self::new(
710 Arc::clone(&self.runtime),
711 config,
712 arc_registry,
713 None,
714 self.policy_handler.clone(),
715 )
716 .await
717 }
718}
719
720impl<R: Runtime> Drop for AgentHandle<R> {
721 fn drop(&mut self) {
722 if self.is_started.load(Ordering::SeqCst) && !self.is_shutdown.load(Ordering::SeqCst) {
723 tracing::debug!(
724 agent_id = self.id,
725 "AgentHandle dropped without explicit shutdown() — \
726 sending best-effort shutdown signal"
727 );
728 self.runtime.try_shutdown_agent(self.id);
729 }
730
731 // Clean up global bridge state entry.
732 if let Ok(mut map) = crate::runtime::bridge_state().write() {
733 map.remove(&self.id);
734 } else {
735 tracing::error!(
736 agent_id = self.id,
737 "BRIDGE_STATE RwLock poisoned during Drop — \
738 bridge state entry for this agent may leak"
739 );
740 }
741 }
742}