1use borderless::{
2 events::{AgentCall, CallAction, Events},
3 http::queries::Pagination,
4 AgentId, BorderlessId,
5};
6use borderless_kv_store::{backend::lmdb::Lmdb, Db};
7use http::method::Method;
8use std::collections::VecDeque;
9use std::convert::Infallible;
10use std::future::Future;
11use std::{
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15 time::Instant,
16};
17use tokio::sync::Mutex;
18
19pub use super::*;
20use crate::log_shim::*;
21use crate::{db::controller::Controller, rt::agent::Runtime};
22
23#[derive(Serialize)]
24pub struct ActionResp {
25 pub events: Events,
26 pub action: CallAction,
27}
28
29pub trait EventHandler: Clone + Send + Sync {
30 type Error: std::fmt::Display + Send + Sync;
31
32 fn handle_events(&self, events: Events)
33 -> impl Future<Output = Result<(), Self::Error>> + Send;
34}
35
36#[derive(Clone)]
40pub struct NoEventHandler;
41
42impl EventHandler for NoEventHandler {
43 type Error = Infallible;
44
45 async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
46 Ok(())
47 }
48}
49
50#[derive(Clone)]
56pub struct RecursiveEventHandler<S: Db> {
57 pub rt: Arc<Mutex<Runtime<S>>>,
58}
59
60impl<S: Db> EventHandler for RecursiveEventHandler<S> {
61 type Error = crate::Error;
62
63 async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
64 let mut rt = self.rt.lock().await;
65 let mut agent_events = VecDeque::with_capacity(events.local.len());
66 agent_events.extend(events.local);
67
68 while let Some(AgentCall { agent_id, action }) = agent_events.pop_front() {
70 if let Some(events) = rt.process_action(&agent_id, action).await? {
72 agent_events.extend(events.local);
73 }
74 }
75 Ok(())
76 }
77}
78
79#[derive(Clone)]
81pub struct SwAgentService<E, S = Lmdb>
82where
83 S: Db + 'static,
84 E: EventHandler,
85{
86 rt: Arc<Mutex<Runtime<S>>>,
87 db: S,
88 writer: BorderlessId,
91 event_handler: E,
92}
93
94impl<S, E> SwAgentService<E, S>
95where
96 S: Db + 'static,
97 E: EventHandler,
98{
99 pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
100 Self {
101 rt: Arc::new(Mutex::new(rt)),
102 db,
103 writer,
104 event_handler,
105 }
106 }
107
108 pub fn with_shared(
109 db: S,
110 rt: Arc<Mutex<Runtime<S>>>,
111 event_handler: E,
112 writer: BorderlessId,
113 ) -> Self {
114 Self {
115 rt,
116 db,
117 writer,
118 event_handler,
119 }
120 }
121
122 async fn process_rq(&self, req: Request) -> crate::Result<Response> {
123 let start = Instant::now();
124 let path = req.uri().path().to_string();
125 let result = match *req.method() {
126 Method::GET => self.process_get_rq(req).await,
127 Method::POST => self.process_post_rq(req).await,
128 _ => Ok(method_not_allowed()),
129 };
130 let elapsed = start.elapsed();
131 info!("Finished executing request. path={path}. Time elapsed: {elapsed:?}");
132 result
133 }
134
135 async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
136 let path = req.uri().path();
137 let query = req.uri().query();
138
139 if path == "/" {
140 let agents = self.rt.lock().await.available_agents()?;
141 return Ok(json_response(&agents));
142 }
143
144 let mut pieces = path.split('/').skip(1);
145
146 let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
148 Some(aid) => aid,
149 None => return Ok(reject_404()),
150 };
151 let controller = Controller::new(&self.db);
152
153 if !controller.agent_exists(&agent_id)? {
155 return Ok(reject_404());
156 }
157
158 let route = match pieces.next() {
160 Some(r) => r,
161 None => {
162 let full_info = controller.agent_full(&agent_id)?;
164 return Ok(json_response(&full_info));
165 }
166 };
167
168 let mut trunc = String::new();
170 for piece in pieces {
171 trunc.push('/');
172 trunc.push_str(piece);
173 }
174 if trunc.is_empty() {
175 trunc.push('/');
176 }
177 if let Some(query) = query {
178 trunc.push('?');
179 trunc.push_str(query);
180 }
181 match route {
182 "state" => {
183 let mut rt = self.rt.lock().await;
185 let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
186 if status == 200 {
187 Ok(json_body(payload))
188 } else {
189 Ok(reject_404())
190 }
191 }
192 "logs" => {
193 let pagination = Pagination::from_query(query).unwrap_or_default();
195
196 let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
198
199 Ok(json_response(&log))
200 }
201 "sinks" => {
202 let sinks = controller.agent_sinks(&agent_id)?;
203 Ok(json_response(&sinks))
204 }
205 "desc" => {
206 let desc = controller.agent_desc(&agent_id)?;
207 Ok(json_response_nested(desc, &trunc))
208 }
209 "meta" => {
210 let meta = controller.agent_meta(&agent_id)?;
211 Ok(json_response_nested(meta, &trunc))
212 }
213 "symbols" => {
214 let mut rt = self.rt.lock().await;
215 let symbols = rt.get_symbols(&agent_id).await?;
216 Ok(json_response_nested(symbols, &trunc))
217 }
218 "pkg" => match trunc.as_str() {
219 "/" => {
220 let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
221 Ok(json_response(&result))
222 }
223 "/def" => {
224 let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
225 Ok(json_response(&result))
226 }
227 "/source" => {
228 let result = controller.agent_pkg_source(&agent_id)?;
229 Ok(json_response(&result))
230 }
231 _ => Ok(reject_404()),
232 },
233 "" => {
235 let full_info = controller.agent_full(&agent_id)?;
236 Ok(json_response(&full_info))
237 }
238 _ => Ok(reject_404()),
239 }
240 }
241
242 async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
243 let path = req.uri().path();
244
245 if path == "/" {
246 return Ok(method_not_allowed());
247 }
248
249 let mut pieces = path.split('/').skip(1);
250
251 let aid_str = match pieces.next() {
253 Some(s) => s,
254 None => return Ok(method_not_allowed()),
255 };
256 let agent_id: AgentId = match aid_str.parse() {
257 Ok(aid) => aid,
258 Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
259 };
260
261 let route = match pieces.next() {
263 Some(r) => r,
264 None => return Ok(reject_404()),
265 };
266
267 let mut trunc = String::new();
269 let mut cnt = 0;
270 for piece in pieces {
271 trunc.push('/');
272 trunc.push_str(piece);
273 cnt += 1;
274 }
275 if cnt > 1 {
277 return Ok(reject_404());
278 }
279 if trunc.is_empty() {
280 trunc.push('/');
281 }
282 if let Some(query) = req.uri().query() {
283 trunc.push('?');
284 trunc.push_str(query);
285 }
286 match route {
287 "action" => {
288 let (parts, payload) = req.into_parts();
290 if !check_json_content(&parts) {
291 return Ok(unsupported_media_type());
292 }
293 let (events, action) = {
294 let mut rt = self.rt.lock().await;
295 rt.set_executor(self.writer)?; match rt
297 .http_post_action(&agent_id, trunc, payload.into(), &self.writer)
298 .await?
299 {
300 Ok(out) => out,
301 Err((status, err)) => {
302 return Ok(err_response(status.try_into().unwrap(), err))
303 }
304 }
305 };
306 match self.event_handler.handle_events(events.clone()).await {
308 Ok(_) => {
309 let resp = ActionResp { events, action };
311 Ok(json_response(&resp))
312 }
313 Err(e) => Ok(into_server_error(e)),
314 }
315 }
316 "" => Ok(method_not_allowed()),
317 _ => Ok(reject_404()),
318 }
319 }
320}
321
322impl<E, S> Service<Request> for SwAgentService<E, S>
323where
324 S: Db + 'static,
325 E: EventHandler + 'static,
326{
327 type Response = Response;
328 type Error = Infallible;
329 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
330
331 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
332 Poll::Ready(Ok(()))
333 }
334
335 fn call(&mut self, req: Request) -> Self::Future {
336 let this = self.clone();
337 let fut = async move {
338 let result: Response = match this.process_rq(req).await {
339 Ok(r) => r,
340 Err(e) => into_server_error(e),
341 };
342 Ok(result)
343 };
344 Box::pin(fut)
345 }
346}