Skip to main content

atomr_agents_deep_research_harness/
harness.rs

1//! The typed deep-research harness + shared run loop.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_agents_callable::Callable;
7use atomr_agents_core::{CallCtx, Event, Result as CoreResult, RunId, Value};
8use atomr_agents_deep_research_core::{ResearchRequest, ResearchResult, ResearchState};
9use atomr_agents_observability::EventBus;
10use atomr_agents_retriever::Retriever;
11use atomr_agents_web_search_core::WebSearch;
12use parking_lot::Mutex;
13use tokio::sync::broadcast;
14
15use crate::boxed::BoxedDeepResearchHarness;
16use crate::dispatch::DeepResearchHarnessDispatch;
17use crate::error::{DeepResearchError, Result};
18use crate::events::{DeepResearchEvent, DeepResearchEventStream};
19use crate::handle::ResearchHandle;
20use crate::loop_strategy::{DeepResearchLoopStrategy, DeepResearchStepCtx, DeepResearchStepOutcome};
21use crate::roles::{CitationVerifier, Clarifier, Critic, Planner, Researcher, Writer};
22use crate::spec::DeepResearchHarnessSpec;
23use crate::state::{DeepResearchState, DeepResearchStepEvent};
24use crate::store::ResearchStore;
25use crate::termination::{DeepResearchTermination, Termination};
26
27const EVENT_CHANNEL_CAPACITY: usize = 512;
28
29/// The full set of role implementations a harness instance is wired
30/// with. Kept as one struct so callers can construct a harness with
31/// `roles: DeepResearchRoles { ... }` rather than naming six arguments.
32pub struct DeepResearchRoles {
33    pub clarifier: Arc<dyn Clarifier>,
34    pub planner: Arc<dyn Planner>,
35    pub researcher: Arc<dyn Researcher>,
36    pub writer: Arc<dyn Writer>,
37    pub critic: Arc<dyn Critic>,
38    pub verifier: Arc<dyn CitationVerifier>,
39}
40
41impl DeepResearchRoles {
42    /// Default deterministic LLM-free roles (mirrors the meetings
43    /// harness's `RuleBasedExtractor` pattern).
44    pub fn defaults() -> Self {
45        use crate::roles::{
46            ConcatWriter, DeterministicCitationVerifier, HeuristicPlanner, MockResearcher, RegexCritic,
47            TemplateClarifier,
48        };
49        Self {
50            clarifier: Arc::new(TemplateClarifier::new()),
51            planner: Arc::new(HeuristicPlanner::new()),
52            researcher: Arc::new(MockResearcher::new()),
53            writer: Arc::new(ConcatWriter::new()),
54            critic: Arc::new(RegexCritic::new()),
55            verifier: Arc::new(DeterministicCitationVerifier::new()),
56        }
57    }
58}
59
60/// A typed deep-research harness.
61pub struct DeepResearchHarness<L, T>
62where
63    L: DeepResearchLoopStrategy,
64    T: DeepResearchTermination,
65{
66    pub spec: DeepResearchHarnessSpec,
67    pub store: Arc<dyn ResearchStore>,
68    pub search: Arc<dyn WebSearch>,
69    pub retriever: Option<Arc<dyn Retriever>>,
70    pub roles: DeepResearchRoles,
71    pub loop_strategy: L,
72    pub termination: T,
73    pub bus: EventBus,
74    pub(crate) event_tx: broadcast::Sender<DeepResearchEvent>,
75    cancel: Arc<parking_lot::Mutex<bool>>,
76}
77
78impl<L, T> DeepResearchHarness<L, T>
79where
80    L: DeepResearchLoopStrategy,
81    T: DeepResearchTermination,
82{
83    pub fn new(
84        spec: DeepResearchHarnessSpec,
85        store: Arc<dyn ResearchStore>,
86        search: Arc<dyn WebSearch>,
87        roles: DeepResearchRoles,
88        loop_strategy: L,
89        termination: T,
90    ) -> Self {
91        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
92        Self {
93            spec,
94            store,
95            search,
96            retriever: None,
97            roles,
98            loop_strategy,
99            termination,
100            bus: EventBus::new(),
101            event_tx,
102            cancel: Arc::new(parking_lot::Mutex::new(false)),
103        }
104    }
105
106    pub fn with_retriever(mut self, retriever: Arc<dyn Retriever>) -> Self {
107        self.retriever = Some(retriever);
108        self
109    }
110
111    pub fn events(&self) -> DeepResearchEventStream {
112        DeepResearchEventStream::new(self.event_tx.subscribe())
113    }
114
115    pub fn event_sender(&self) -> broadcast::Sender<DeepResearchEvent> {
116        self.event_tx.clone()
117    }
118
119    pub fn cancel(&self) {
120        *self.cancel.lock() = true;
121    }
122
123    /// Run the harness against a [`ResearchRequest`].
124    pub async fn run(&self, request: ResearchRequest) -> Result<ResearchResult> {
125        let mut result = ResearchResult::new(request.query.clone(), self.loop_strategy.name());
126        result.model_id = self.spec.model_id.clone();
127        self.store.put(&result).await?;
128
129        let result_arc = Arc::new(Mutex::new(result.clone()));
130        let request_arc = Arc::new(request);
131        let mut handle = ResearchHandle::new(result_arc.clone(), request_arc.clone(), self.search.clone());
132        if let Some(r) = &self.retriever {
133            handle = handle.with_retriever(r.clone());
134        }
135        let handle = handle.with_events(self.event_tx.clone());
136
137        let _ = self.event_tx.send(DeepResearchEvent::Started {
138            strategy: self.loop_strategy.name().to_string(),
139            query: request_arc.query.clone(),
140        });
141        handle.set_state(ResearchState::Running);
142
143        let mut state = DeepResearchState::new(result_arc.lock().clone(), self.spec.initial_budget.remaining);
144        let run_id = RunId::new();
145        let final_reason = run_loop(
146            &self.spec,
147            &run_id,
148            &self.loop_strategy as &dyn DeepResearchLoopStrategy,
149            &self.termination as &dyn DeepResearchTermination,
150            self.store.clone(),
151            &handle,
152            &self.event_tx,
153            self.cancel.clone(),
154            &self.bus,
155            &self.roles,
156            &mut state,
157        )
158        .await;
159
160        // Persist final state (success OR failure).
161        match final_reason {
162            Ok(_) => {
163                handle.finalize();
164            }
165            Err(e) => {
166                handle.fail(e.to_string());
167            }
168        }
169        let final_result = result_arc.lock().clone();
170        self.store.put(&final_result).await?;
171        Ok(final_result)
172    }
173
174    pub fn into_boxed(self) -> BoxedDeepResearchHarness {
175        BoxedDeepResearchHarness {
176            spec: self.spec,
177            store: self.store,
178            search: self.search,
179            retriever: self.retriever,
180            roles: self.roles,
181            loop_strategy: Box::new(self.loop_strategy),
182            termination: Box::new(self.termination),
183            bus: self.bus,
184            event_tx: self.event_tx,
185            cancel: self.cancel,
186        }
187    }
188}
189
190#[async_trait]
191impl<L, T> Callable for DeepResearchHarness<L, T>
192where
193    L: DeepResearchLoopStrategy,
194    T: DeepResearchTermination,
195{
196    async fn call(&self, input: Value, _ctx: CallCtx) -> CoreResult<Value> {
197        let request = crate::dispatch::parse_request(input)?;
198        let result = self.run(request).await?;
199        Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
200    }
201    fn label(&self) -> &str {
202        self.spec.id.as_str()
203    }
204}
205
206#[async_trait]
207impl<L, T> DeepResearchHarnessDispatch for DeepResearchHarness<L, T>
208where
209    L: DeepResearchLoopStrategy,
210    T: DeepResearchTermination,
211{
212    async fn dispatch(&self, request: ResearchRequest) -> CoreResult<Value> {
213        let result = self.run(request).await?;
214        Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
215    }
216}
217
218#[allow(clippy::too_many_arguments)]
219pub(crate) async fn run_loop(
220    spec: &DeepResearchHarnessSpec,
221    run_id: &RunId,
222    loop_strategy: &dyn DeepResearchLoopStrategy,
223    termination: &dyn DeepResearchTermination,
224    store: Arc<dyn ResearchStore>,
225    handle: &ResearchHandle,
226    events: &broadcast::Sender<DeepResearchEvent>,
227    cancel: Arc<parking_lot::Mutex<bool>>,
228    bus: &EventBus,
229    roles: &DeepResearchRoles,
230    state: &mut DeepResearchState,
231) -> Result<&'static str> {
232    let final_reason: &'static str = loop {
233        if *cancel.lock() {
234            state.cancel_requested = true;
235        }
236        if let Termination::Done(reason) = termination.should_terminate(state) {
237            emit_iteration(
238                bus,
239                spec,
240                run_id,
241                state.iteration,
242                &format!("terminated:{reason}"),
243            );
244            break reason;
245        }
246        state.iteration += 1;
247
248        let mut ctx = DeepResearchStepCtx {
249            state,
250            handle,
251            store: store.clone(),
252            clarifier: roles.clarifier.as_ref(),
253            planner: roles.planner.as_ref(),
254            researcher: roles.researcher.as_ref(),
255            writer: roles.writer.as_ref(),
256            critic: roles.critic.as_ref(),
257            verifier: roles.verifier.as_ref(),
258            events,
259        };
260        let outcome = loop_strategy.step(&mut ctx).await?;
261        let label_owned = match &outcome {
262            DeepResearchStepOutcome::Continue { label } => label.clone(),
263            DeepResearchStepOutcome::Done { label } => label.clone(),
264        };
265        state.history.push(DeepResearchStepEvent {
266            iteration: state.iteration,
267            outcome: label_owned.clone(),
268            timestamp_ms: chrono::Utc::now().timestamp_millis(),
269        });
270        emit_iteration(bus, spec, run_id, state.iteration, &label_owned);
271
272        // Persist incrementally so the web UI sees in-flight progress.
273        let snap = handle.snapshot();
274        state.result = snap.clone();
275        store.put(&snap).await?;
276
277        if matches!(outcome, DeepResearchStepOutcome::Done { .. }) {
278            break "complete";
279        }
280    };
281    Ok(final_reason)
282}
283
284fn emit_iteration(
285    bus: &EventBus,
286    spec: &DeepResearchHarnessSpec,
287    run_id: &RunId,
288    iteration: u64,
289    outcome: &str,
290) {
291    bus.emit_run(
292        Event::HarnessIteration {
293            harness_id: spec.id.clone(),
294            iteration,
295            outcome: outcome.to_string(),
296            budget_remaining_tokens: 0,
297        },
298        run_id.clone(),
299        None,
300    );
301}