Skip to main content

atomr_agents_deep_research_harness/
boxed.rs

1//! Type-erased deep-research harness.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_agents_callable::Callable;
7use atomr_agents_core::{CallCtx, Result as CoreResult, RunId, Value};
8use atomr_agents_deep_research_core::{ResearchRequest, ResearchResult, ResearchState};
9use atomr_agents_observability::EventBus;
10use atomr_agents_retriever::Retriever;
11use atomr_agents_web_search_core::WebSearch;
12use parking_lot::Mutex;
13use tokio::sync::broadcast;
14
15use crate::dispatch::DeepResearchHarnessDispatch;
16use crate::error::{DeepResearchError, Result};
17use crate::events::{DeepResearchEvent, DeepResearchEventStream};
18use crate::handle::ResearchHandle;
19use crate::harness::{run_loop, DeepResearchRoles};
20use crate::loop_strategy::DeepResearchLoopStrategy;
21use crate::spec::DeepResearchHarnessSpec;
22use crate::state::DeepResearchState;
23use crate::store::ResearchStore;
24use crate::termination::DeepResearchTermination;
25
26/// Type-erased deep-research harness.
27pub struct BoxedDeepResearchHarness {
28    pub spec: DeepResearchHarnessSpec,
29    pub store: Arc<dyn ResearchStore>,
30    pub search: Arc<dyn WebSearch>,
31    pub retriever: Option<Arc<dyn Retriever>>,
32    pub roles: DeepResearchRoles,
33    pub loop_strategy: Box<dyn DeepResearchLoopStrategy>,
34    pub termination: Box<dyn DeepResearchTermination>,
35    pub bus: EventBus,
36    pub(crate) event_tx: broadcast::Sender<DeepResearchEvent>,
37    pub(crate) cancel: Arc<parking_lot::Mutex<bool>>,
38}
39
40impl BoxedDeepResearchHarness {
41    pub fn new(
42        spec: DeepResearchHarnessSpec,
43        store: Arc<dyn ResearchStore>,
44        search: Arc<dyn WebSearch>,
45        roles: DeepResearchRoles,
46        loop_strategy: Box<dyn DeepResearchLoopStrategy>,
47        termination: Box<dyn DeepResearchTermination>,
48    ) -> Self {
49        let (event_tx, _) = broadcast::channel(512);
50        Self {
51            spec,
52            store,
53            search,
54            retriever: None,
55            roles,
56            loop_strategy,
57            termination,
58            bus: EventBus::new(),
59            event_tx,
60            cancel: Arc::new(parking_lot::Mutex::new(false)),
61        }
62    }
63
64    pub fn with_retriever(mut self, retriever: Arc<dyn Retriever>) -> Self {
65        self.retriever = Some(retriever);
66        self
67    }
68
69    pub fn events(&self) -> DeepResearchEventStream {
70        DeepResearchEventStream::new(self.event_tx.subscribe())
71    }
72
73    pub fn event_sender(&self) -> broadcast::Sender<DeepResearchEvent> {
74        self.event_tx.clone()
75    }
76
77    pub fn cancel(&self) {
78        *self.cancel.lock() = true;
79    }
80
81    pub async fn run(&self, request: ResearchRequest) -> Result<ResearchResult> {
82        let mut result = ResearchResult::new(request.query.clone(), self.loop_strategy.name());
83        result.model_id = self.spec.model_id.clone();
84        self.store.put(&result).await?;
85
86        let result_arc = Arc::new(Mutex::new(result.clone()));
87        let request_arc = Arc::new(request);
88        let mut handle = ResearchHandle::new(result_arc.clone(), request_arc.clone(), self.search.clone());
89        if let Some(r) = &self.retriever {
90            handle = handle.with_retriever(r.clone());
91        }
92        let handle = handle.with_events(self.event_tx.clone());
93
94        let _ = self.event_tx.send(DeepResearchEvent::Started {
95            strategy: self.loop_strategy.name().to_string(),
96            query: request_arc.query.clone(),
97        });
98        handle.set_state(ResearchState::Running);
99
100        let mut state = DeepResearchState::new(result_arc.lock().clone(), self.spec.initial_budget.remaining);
101        let run_id = RunId::new();
102        let final_reason = run_loop(
103            &self.spec,
104            &run_id,
105            &*self.loop_strategy,
106            &*self.termination,
107            self.store.clone(),
108            &handle,
109            &self.event_tx,
110            self.cancel.clone(),
111            &self.bus,
112            &self.roles,
113            &mut state,
114        )
115        .await;
116        match final_reason {
117            Ok(_) => handle.finalize(),
118            Err(e) => handle.fail(e.to_string()),
119        }
120        let final_result = result_arc.lock().clone();
121        self.store.put(&final_result).await?;
122        Ok(final_result)
123    }
124}
125
126#[async_trait]
127impl Callable for BoxedDeepResearchHarness {
128    async fn call(&self, input: Value, _ctx: CallCtx) -> CoreResult<Value> {
129        let request = crate::dispatch::parse_request(input)?;
130        let result = self.run(request).await?;
131        Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
132    }
133    fn label(&self) -> &str {
134        self.spec.id.as_str()
135    }
136}
137
138#[async_trait]
139impl DeepResearchHarnessDispatch for BoxedDeepResearchHarness {
140    async fn dispatch(&self, request: ResearchRequest) -> CoreResult<Value> {
141        let result = self.run(request).await?;
142        Ok(serde_json::to_value(result).map_err(DeepResearchError::from)?)
143    }
144}