1pub use super::*;
2use crate::log_shim::*;
3use crate::{db::controller::Controller, rt::agent::Runtime};
4use borderless::events::Message;
5use borderless::{
6 events::{CallAction, Events},
7 http::queries::Pagination,
8 AgentId, BorderlessId,
9};
10use borderless_kv_store::{backend::lmdb::Lmdb, Db};
11use http::method::Method;
12use std::collections::VecDeque;
13use std::convert::Infallible;
14use std::future::Future;
15use std::{
16 pin::Pin,
17 sync::Arc,
18 task::{Context, Poll},
19 time::Instant,
20};
21use tokio::sync::Mutex;
22
23#[derive(Serialize)]
24pub struct ActionResp {
25 pub events: Events,
26 pub action: CallAction,
27}
28
29pub trait EventHandler: Clone + Send + Sync {
30 type Error: std::fmt::Display + Send + Sync;
31
32 fn handle_events(&self, events: Events)
33 -> impl Future<Output = Result<(), Self::Error>> + Send;
34}
35
36#[derive(Clone)]
40pub struct NoEventHandler;
41
42impl EventHandler for NoEventHandler {
43 type Error = Infallible;
44
45 async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
46 Ok(())
47 }
48}
49
50#[derive(Clone)]
56pub struct RecursiveEventHandler<S: Db> {
57 pub rt: Arc<Mutex<Runtime<S>>>,
58}
59
60impl<S: Db> EventHandler for RecursiveEventHandler<S> {
61 type Error = crate::Error;
62
63 async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
64 let mut rt = self.rt.lock().await;
65 let db = rt.get_db();
66 let sub_handler = Controller::new(&db).messages();
67
68 let mut agent_events: VecDeque<_> = events.local.into();
69 while let Some(Message {
71 publisher,
72 topic,
73 value,
74 }) = agent_events.pop_front()
75 {
76 let subscribers = sub_handler.get_topic_subscribers(publisher, topic)?;
77
78 for (subscriber, method) in subscribers {
79 let action = CallAction::by_method(method, value.clone());
80 if let Some(events) = rt.process_action(&subscriber, action).await? {
82 agent_events.extend(events.local);
83 }
84 }
85 }
86 Ok(())
87 }
88}
89
90#[derive(Clone)]
92pub struct SwAgentService<E, S = Lmdb>
93where
94 S: Db + 'static,
95 E: EventHandler,
96{
97 rt: Arc<Mutex<Runtime<S>>>,
98 db: S,
99 writer: BorderlessId,
102 event_handler: E,
103}
104
105impl<S, E> SwAgentService<E, S>
106where
107 S: Db + 'static,
108 E: EventHandler,
109{
110 pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
111 Self {
112 rt: Arc::new(Mutex::new(rt)),
113 db,
114 writer,
115 event_handler,
116 }
117 }
118
119 pub fn with_shared(
120 db: S,
121 rt: Arc<Mutex<Runtime<S>>>,
122 event_handler: E,
123 writer: BorderlessId,
124 ) -> Self {
125 Self {
126 rt,
127 db,
128 writer,
129 event_handler,
130 }
131 }
132
133 async fn process_rq(&self, req: Request) -> crate::Result<Response> {
134 let start = Instant::now();
135 let path = req.uri().path().to_string();
136 let result = match *req.method() {
137 Method::GET => self.process_get_rq(req).await,
138 Method::POST => self.process_post_rq(req).await,
139 _ => Ok(method_not_allowed()),
140 };
141 let elapsed = start.elapsed();
142 match &result {
144 Ok(res) => info!(
145 "Request success. path={path}. Time elapsed: {elapsed:?}, status={}",
146 res.status()
147 ),
148 Err(e) => warn!("Request failed. path={path}. Time elapsed: {elapsed:?}, error={e}"),
149 }
150 result
151 }
152
153 async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
154 let path = req.uri().path();
155 let query = req.uri().query();
156
157 if path == "/" {
158 let agents = self.rt.lock().await.available_agents()?;
159 return Ok(json_response(&agents));
160 }
161
162 let mut pieces = path.split('/').skip(1);
163
164 let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
166 Some(aid) => aid,
167 None => return Ok(reject_404()),
168 };
169 let controller = Controller::new(&self.db);
170
171 if !controller.agent_exists(&agent_id)? {
173 return Ok(reject_404());
174 }
175
176 let route = match pieces.next() {
178 Some(r) => r,
179 None => {
180 let full_info = controller.agent_full(&agent_id)?;
182 return Ok(json_response(&full_info));
183 }
184 };
185
186 let mut trunc = String::new();
188 for piece in pieces {
189 trunc.push('/');
190 trunc.push_str(piece);
191 }
192 if trunc.is_empty() {
193 trunc.push('/');
194 }
195 if let Some(query) = query {
196 trunc.push('?');
197 trunc.push_str(query);
198 }
199 match route {
200 "state" => {
201 let mut rt = self.rt.lock().await;
203 let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
204 if status == 200 {
205 Ok(json_body(payload))
206 } else {
207 Ok(reject_404())
208 }
209 }
210 "logs" => {
211 let pagination = Pagination::from_query(query).unwrap_or_default();
213
214 let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
216
217 Ok(json_response(&log))
218 }
219 "sinks" => {
220 let sinks = controller.agent_sinks(&agent_id)?;
221 Ok(json_response(&sinks))
222 }
223 "subs" => {
224 let subs = controller.agent_subs(&agent_id)?;
225 Ok(json_response(&subs))
226 }
227 "desc" => {
228 let desc = controller.agent_desc(&agent_id)?;
229 Ok(json_response_nested(desc, &trunc))
230 }
231 "meta" => {
232 let meta = controller.agent_meta(&agent_id)?;
233 Ok(json_response_nested(meta, &trunc))
234 }
235 "symbols" => {
236 let mut rt = self.rt.lock().await;
237 let symbols = rt.get_symbols(&agent_id).await?;
238 Ok(json_response_nested(symbols, &trunc))
239 }
240 "pkg" => match trunc.as_str() {
241 "/" => {
242 let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
243 Ok(json_response(&result))
244 }
245 "/def" => {
246 let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
247 Ok(json_response(&result))
248 }
249 "/source" => {
250 let result = controller.agent_pkg_source(&agent_id)?;
251 Ok(json_response(&result))
252 }
253 _ => Ok(reject_404()),
254 },
255 "" => {
257 let full_info = controller.agent_full(&agent_id)?;
258 Ok(json_response(&full_info))
259 }
260 _ => Ok(reject_404()),
261 }
262 }
263
264 async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
265 let path = req.uri().path();
266
267 if path == "/" {
268 return Ok(method_not_allowed());
269 }
270
271 let mut pieces = path.split('/').skip(1);
272
273 let aid_str = match pieces.next() {
275 Some(s) => s,
276 None => return Ok(method_not_allowed()),
277 };
278 let agent_id: AgentId = match aid_str.parse() {
279 Ok(aid) => aid,
280 Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
281 };
282
283 let route = match pieces.next() {
285 Some(r) => r,
286 None => return Ok(reject_404()),
287 };
288
289 let mut trunc = String::new();
291 let mut cnt = 0;
292 for piece in pieces {
293 trunc.push('/');
294 trunc.push_str(piece);
295 cnt += 1;
296 }
297 if cnt > 1 {
299 return Ok(reject_404());
300 }
301 if trunc.is_empty() {
302 trunc.push('/');
303 }
304 if let Some(query) = req.uri().query() {
305 trunc.push('?');
306 trunc.push_str(query);
307 }
308 match route {
309 "action" => {
310 let (parts, payload) = req.into_parts();
312 if !check_json_content(&parts) {
313 return Ok(unsupported_media_type());
314 }
315 let (events, action) = {
316 let mut rt = self.rt.lock().await;
317 rt.set_executor(self.writer)?; match rt
319 .http_post_action(&agent_id, trunc, payload.into(), &self.writer)
320 .await?
321 {
322 Ok(out) => out,
323 Err((status, err)) => {
324 return Ok(err_response(status.try_into().unwrap(), err))
325 }
326 }
327 };
328 match self.event_handler.handle_events(events.clone()).await {
330 Ok(_) => {
331 let resp = ActionResp { events, action };
333 Ok(json_response(&resp))
334 }
335 Err(e) => Ok(into_server_error(e)),
336 }
337 }
338 "" => Ok(method_not_allowed()),
339 _ => Ok(reject_404()),
340 }
341 }
342}
343
344impl<E, S> Service<Request> for SwAgentService<E, S>
345where
346 S: Db + 'static,
347 E: EventHandler + 'static,
348{
349 type Response = Response;
350 type Error = Infallible;
351 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
352
353 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
354 Poll::Ready(Ok(()))
355 }
356
357 fn call(&mut self, req: Request) -> Self::Future {
358 let this = self.clone();
359 let fut = async move {
360 let result: Response = match this.process_rq(req).await {
361 Ok(r) => r,
362 Err(e) => into_server_error(e),
363 };
364 Ok(result)
365 };
366 Box::pin(fut)
367 }
368}