atomr_agents_deep_research_harness/
boxed.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_agents_callable::Callable;
7use atomr_agents_core::{CallCtx, Result as CoreResult, RunId, Value};
8use atomr_agents_deep_research_core::{ResearchRequest, ResearchResult, ResearchState};
9use atomr_agents_observability::EventBus;
10use atomr_agents_retriever::Retriever;
11use atomr_agents_web_search_core::WebSearch;
12use parking_lot::Mutex;
13use tokio::sync::broadcast;
14
15use crate::dispatch::DeepResearchHarnessDispatch;
16use crate::error::{DeepResearchError, Result};
17use crate::events::{DeepResearchEvent, DeepResearchEventStream};
18use crate::handle::ResearchHandle;
19use crate::harness::{run_loop, DeepResearchRoles};
20use crate::loop_strategy::DeepResearchLoopStrategy;
21use crate::spec::DeepResearchHarnessSpec;
22use crate::state::DeepResearchState;
23use crate::store::ResearchStore;
24use crate::termination::DeepResearchTermination;
25
26pub struct BoxedDeepResearchHarness {
28 pub spec: DeepResearchHarnessSpec,
29 pub store: Arc<dyn ResearchStore>,
30 pub search: Arc<dyn WebSearch>,
31 pub retriever: Option<Arc<dyn Retriever>>,
32 pub roles: DeepResearchRoles,
33 pub loop_strategy: Box<dyn DeepResearchLoopStrategy>,
34 pub termination: Box<dyn DeepResearchTermination>,
35 pub bus: EventBus,
36 pub(crate) event_tx: broadcast::Sender<DeepResearchEvent>,
37 pub(crate) cancel: Arc<parking_lot::Mutex<bool>>,
38}
39
40impl BoxedDeepResearchHarness {
41 pub fn new(
42 spec: DeepResearchHarnessSpec,
43 store: Arc<dyn ResearchStore>,
44 search: Arc<dyn WebSearch>,
45 roles: DeepResearchRoles,
46 loop_strategy: Box<dyn DeepResearchLoopStrategy>,
47 termination: Box<dyn DeepResearchTermination>,
48 ) -> Self {
49 let (event_tx, _) = broadcast::channel(512);
50 Self {
51 spec,
52 store,
53 search,
54 retriever: None,
55 roles,
56 loop_strategy,
57 termination,
58 bus: EventBus::new(),
59 event_tx,
60 cancel: Arc::new(parking_lot::Mutex::new(false)),
61 }
62 }
63
64 pub fn with_retriever(mut self, retriever: Arc<dyn Retriever>) -> Self {
65 self.retriever = Some(retriever);
66 self
67 }
68
69 pub fn events(&self) -> DeepResearchEventStream {
70 DeepResearchEventStream::new(self.event_tx.subscribe())
71 }
72
73 pub fn event_sender(&self) -> broadcast::Sender<DeepResearchEvent> {
74 self.event_tx.clone()
75 }
76
77 pub fn cancel(&self) {
78 *self.cancel.lock() = true;
79 }
80
81 pub async fn run(&self, request: ResearchRequest) -> Result<ResearchResult> {
82 let mut result = ResearchResult::new(request.query.clone(), self.loop_strategy.name());
83 result.model_id = self.spec.model_id.clone();
84 self.store.put(&result).await?;
85
86 let result_arc = Arc::new(Mutex::new(result.clone()));
87 let request_arc = Arc::new(request);
88 let mut handle = ResearchHandle::new(result_arc.clone(), request_arc.clone(), self.search.clone());
89 if let Some(r) = &self.retriever {
90 handle = handle.with_retriever(r.clone());
91 }
92 let handle = handle.with_events(self.event_tx.clone());
93
94 let _ = self.event_tx.send(DeepResearchEvent::Started {
95 strategy: self.loop_strategy.name().to_string(),
96 query: request_arc.query.clone(),
97 });
98 handle.set_state(ResearchState::Running);
99
100 let mut state = DeepResearchState::new(result_arc.lock().clone(), self.spec.initial_budget.remaining);
101 let run_id = RunId::new();
102 let final_reason = run_loop(
103 &self.spec,
104 &run_id,
105 &*self.loop_strategy,
106 &*self.termination,
107 self.store.clone(),
108 &handle,
109 &self.event_tx,
110 self.cancel.clone(),
111 &self.bus,
112 &self.roles,
113 &mut state,
114 )
115 .await;
116 match final_reason {
117 Ok(_) => handle.finalize(),
118 Err(e) => handle.fail(e.to_string()),
119 }
120 let final_result = result_arc.lock().clone();
121 self.store.put(&final_result).await?;
122 Ok(final_result)
123 }
124}
125
126#[async_trait]
127impl Callable for BoxedDeepResearchHarness {
128 async fn call(&self, input: Value, _ctx: CallCtx) -> CoreResult<Value> {
129 let request = crate::dispatch::parse_request(input)?;
130 let result = self.run(request).await?;
131 Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
132 }
133 fn label(&self) -> &str {
134 self.spec.id.as_str()
135 }
136}
137
138#[async_trait]
139impl DeepResearchHarnessDispatch for BoxedDeepResearchHarness {
140 async fn dispatch(&self, request: ResearchRequest) -> CoreResult<Value> {
141 let result = self.run(request).await?;
142 Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
143 }
144}