atomr_agents_deep_research_harness/
harness.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_agents_callable::Callable;
7use atomr_agents_core::{CallCtx, Event, Result as CoreResult, RunId, Value};
8use atomr_agents_deep_research_core::{ResearchRequest, ResearchResult, ResearchState};
9use atomr_agents_observability::EventBus;
10use atomr_agents_retriever::Retriever;
11use atomr_agents_web_search_core::WebSearch;
12use parking_lot::Mutex;
13use tokio::sync::broadcast;
14
15use crate::boxed::BoxedDeepResearchHarness;
16use crate::dispatch::DeepResearchHarnessDispatch;
17use crate::error::{DeepResearchError, Result};
18use crate::events::{DeepResearchEvent, DeepResearchEventStream};
19use crate::handle::ResearchHandle;
20use crate::loop_strategy::{DeepResearchLoopStrategy, DeepResearchStepCtx, DeepResearchStepOutcome};
21use crate::roles::{CitationVerifier, Clarifier, Critic, Planner, Researcher, Writer};
22use crate::spec::DeepResearchHarnessSpec;
23use crate::state::{DeepResearchState, DeepResearchStepEvent};
24use crate::store::ResearchStore;
25use crate::termination::{DeepResearchTermination, Termination};
26
27const EVENT_CHANNEL_CAPACITY: usize = 512;
28
29pub struct DeepResearchRoles {
33 pub clarifier: Arc<dyn Clarifier>,
34 pub planner: Arc<dyn Planner>,
35 pub researcher: Arc<dyn Researcher>,
36 pub writer: Arc<dyn Writer>,
37 pub critic: Arc<dyn Critic>,
38 pub verifier: Arc<dyn CitationVerifier>,
39}
40
41impl DeepResearchRoles {
42 pub fn defaults() -> Self {
45 use crate::roles::{
46 ConcatWriter, DeterministicCitationVerifier, HeuristicPlanner, MockResearcher, RegexCritic,
47 TemplateClarifier,
48 };
49 Self {
50 clarifier: Arc::new(TemplateClarifier::new()),
51 planner: Arc::new(HeuristicPlanner::new()),
52 researcher: Arc::new(MockResearcher::new()),
53 writer: Arc::new(ConcatWriter::new()),
54 critic: Arc::new(RegexCritic::new()),
55 verifier: Arc::new(DeterministicCitationVerifier::new()),
56 }
57 }
58}
59
60pub struct DeepResearchHarness<L, T>
62where
63 L: DeepResearchLoopStrategy,
64 T: DeepResearchTermination,
65{
66 pub spec: DeepResearchHarnessSpec,
67 pub store: Arc<dyn ResearchStore>,
68 pub search: Arc<dyn WebSearch>,
69 pub retriever: Option<Arc<dyn Retriever>>,
70 pub roles: DeepResearchRoles,
71 pub loop_strategy: L,
72 pub termination: T,
73 pub bus: EventBus,
74 pub(crate) event_tx: broadcast::Sender<DeepResearchEvent>,
75 cancel: Arc<parking_lot::Mutex<bool>>,
76}
77
78impl<L, T> DeepResearchHarness<L, T>
79where
80 L: DeepResearchLoopStrategy,
81 T: DeepResearchTermination,
82{
83 pub fn new(
84 spec: DeepResearchHarnessSpec,
85 store: Arc<dyn ResearchStore>,
86 search: Arc<dyn WebSearch>,
87 roles: DeepResearchRoles,
88 loop_strategy: L,
89 termination: T,
90 ) -> Self {
91 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
92 Self {
93 spec,
94 store,
95 search,
96 retriever: None,
97 roles,
98 loop_strategy,
99 termination,
100 bus: EventBus::new(),
101 event_tx,
102 cancel: Arc::new(parking_lot::Mutex::new(false)),
103 }
104 }
105
106 pub fn with_retriever(mut self, retriever: Arc<dyn Retriever>) -> Self {
107 self.retriever = Some(retriever);
108 self
109 }
110
111 pub fn events(&self) -> DeepResearchEventStream {
112 DeepResearchEventStream::new(self.event_tx.subscribe())
113 }
114
115 pub fn event_sender(&self) -> broadcast::Sender<DeepResearchEvent> {
116 self.event_tx.clone()
117 }
118
119 pub fn cancel(&self) {
120 *self.cancel.lock() = true;
121 }
122
123 pub async fn run(&self, request: ResearchRequest) -> Result<ResearchResult> {
125 let mut result = ResearchResult::new(request.query.clone(), self.loop_strategy.name());
126 result.model_id = self.spec.model_id.clone();
127 self.store.put(&result).await?;
128
129 let result_arc = Arc::new(Mutex::new(result.clone()));
130 let request_arc = Arc::new(request);
131 let mut handle = ResearchHandle::new(result_arc.clone(), request_arc.clone(), self.search.clone());
132 if let Some(r) = &self.retriever {
133 handle = handle.with_retriever(r.clone());
134 }
135 let handle = handle.with_events(self.event_tx.clone());
136
137 let _ = self.event_tx.send(DeepResearchEvent::Started {
138 strategy: self.loop_strategy.name().to_string(),
139 query: request_arc.query.clone(),
140 });
141 handle.set_state(ResearchState::Running);
142
143 let mut state = DeepResearchState::new(result_arc.lock().clone(), self.spec.initial_budget.remaining);
144 let run_id = RunId::new();
145 let final_reason = run_loop(
146 &self.spec,
147 &run_id,
148 &self.loop_strategy as &dyn DeepResearchLoopStrategy,
149 &self.termination as &dyn DeepResearchTermination,
150 self.store.clone(),
151 &handle,
152 &self.event_tx,
153 self.cancel.clone(),
154 &self.bus,
155 &self.roles,
156 &mut state,
157 )
158 .await;
159
160 match final_reason {
162 Ok(_) => {
163 handle.finalize();
164 }
165 Err(e) => {
166 handle.fail(e.to_string());
167 }
168 }
169 let final_result = result_arc.lock().clone();
170 self.store.put(&final_result).await?;
171 Ok(final_result)
172 }
173
174 pub fn into_boxed(self) -> BoxedDeepResearchHarness {
175 BoxedDeepResearchHarness {
176 spec: self.spec,
177 store: self.store,
178 search: self.search,
179 retriever: self.retriever,
180 roles: self.roles,
181 loop_strategy: Box::new(self.loop_strategy),
182 termination: Box::new(self.termination),
183 bus: self.bus,
184 event_tx: self.event_tx,
185 cancel: self.cancel,
186 }
187 }
188}
189
190#[async_trait]
191impl<L, T> Callable for DeepResearchHarness<L, T>
192where
193 L: DeepResearchLoopStrategy,
194 T: DeepResearchTermination,
195{
196 async fn call(&self, input: Value, _ctx: CallCtx) -> CoreResult<Value> {
197 let request = crate::dispatch::parse_request(input)?;
198 let result = self.run(request).await?;
199 Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
200 }
201 fn label(&self) -> &str {
202 self.spec.id.as_str()
203 }
204}
205
206#[async_trait]
207impl<L, T> DeepResearchHarnessDispatch for DeepResearchHarness<L, T>
208where
209 L: DeepResearchLoopStrategy,
210 T: DeepResearchTermination,
211{
212 async fn dispatch(&self, request: ResearchRequest) -> CoreResult<Value> {
213 let result = self.run(request).await?;
214 Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
215 }
216}
217
218#[allow(clippy::too_many_arguments)]
219pub(crate) async fn run_loop(
220 spec: &DeepResearchHarnessSpec,
221 run_id: &RunId,
222 loop_strategy: &dyn DeepResearchLoopStrategy,
223 termination: &dyn DeepResearchTermination,
224 store: Arc<dyn ResearchStore>,
225 handle: &ResearchHandle,
226 events: &broadcast::Sender<DeepResearchEvent>,
227 cancel: Arc<parking_lot::Mutex<bool>>,
228 bus: &EventBus,
229 roles: &DeepResearchRoles,
230 state: &mut DeepResearchState,
231) -> Result<&'static str> {
232 let final_reason: &'static str = loop {
233 if *cancel.lock() {
234 state.cancel_requested = true;
235 }
236 if let Termination::Done(reason) = termination.should_terminate(state) {
237 emit_iteration(
238 bus,
239 spec,
240 run_id,
241 state.iteration,
242 &format!("terminated:{reason}"),
243 );
244 break reason;
245 }
246 state.iteration += 1;
247
248 let mut ctx = DeepResearchStepCtx {
249 state,
250 handle,
251 store: store.clone(),
252 clarifier: roles.clarifier.as_ref(),
253 planner: roles.planner.as_ref(),
254 researcher: roles.researcher.as_ref(),
255 writer: roles.writer.as_ref(),
256 critic: roles.critic.as_ref(),
257 verifier: roles.verifier.as_ref(),
258 events,
259 };
260 let outcome = loop_strategy.step(&mut ctx).await?;
261 let label_owned = match &outcome {
262 DeepResearchStepOutcome::Continue { label } => label.clone(),
263 DeepResearchStepOutcome::Done { label } => label.clone(),
264 };
265 state.history.push(DeepResearchStepEvent {
266 iteration: state.iteration,
267 outcome: label_owned.clone(),
268 timestamp_ms: chrono::Utc::now().timestamp_millis(),
269 });
270 emit_iteration(bus, spec, run_id, state.iteration, &label_owned);
271
272 let snap = handle.snapshot();
274 state.result = snap.clone();
275 store.put(&snap).await?;
276
277 if matches!(outcome, DeepResearchStepOutcome::Done { .. }) {
278 break "complete";
279 }
280 };
281 Ok(final_reason)
282}
283
284fn emit_iteration(
285 bus: &EventBus,
286 spec: &DeepResearchHarnessSpec,
287 run_id: &RunId,
288 iteration: u64,
289 outcome: &str,
290) {
291 bus.emit_run(
292 Event::HarnessIteration {
293 harness_id: spec.id.clone(),
294 iteration,
295 outcome: outcome.to_string(),
296 budget_remaining_tokens: 0,
297 },
298 run_id.clone(),
299 None,
300 );
301}