1pub use super::*;
2use crate::log_shim::*;
3use crate::{db::controller::Controller, rt::agent::Runtime};
4use borderless::events::{Message, Topic, TopicDto};
5use borderless::{
6 events::{CallAction, Events},
7 http::queries::Pagination,
8 AgentId, BorderlessId,
9};
10use borderless_kv_store::{backend::lmdb::Lmdb, Db};
11use http::method::Method;
12use serde_json::json;
13use std::collections::VecDeque;
14use std::convert::Infallible;
15use std::future::Future;
16use std::{
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20 time::Instant,
21};
22use tokio::sync::Mutex;
23
24#[derive(Serialize)]
25pub struct ActionResp {
26 pub events: Events,
27 pub action: CallAction,
28}
29
30pub trait EventHandler: Clone + Send + Sync {
31 type Error: std::fmt::Display + Send + Sync;
32
33 fn handle_events(&self, events: Events)
34 -> impl Future<Output = Result<(), Self::Error>> + Send;
35}
36
37#[derive(Clone)]
41pub struct NoEventHandler;
42
43impl EventHandler for NoEventHandler {
44 type Error = Infallible;
45
46 async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
47 Ok(())
48 }
49}
50
51#[derive(Clone)]
57pub struct RecursiveEventHandler<S: Db> {
58 pub rt: Arc<Mutex<Runtime<S>>>,
59}
60
61impl<S: Db> EventHandler for RecursiveEventHandler<S> {
62 type Error = crate::Error;
63
64 async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
65 let mut rt = self.rt.lock().await;
66 let db = rt.get_db();
67 let sub_handler = Controller::new(&db).messages();
68
69 let mut agent_events: VecDeque<_> = events.local.into();
70 while let Some(Message {
72 publisher,
73 topic,
74 value,
75 }) = agent_events.pop_front()
76 {
77 let subscribers = sub_handler.get_topic_subscribers(publisher, topic)?;
78
79 for (subscriber, method) in subscribers {
80 let action = CallAction::by_method(method, value.clone());
81 if let Some(events) = rt.process_action(&subscriber, action).await? {
83 agent_events.extend(events.local);
84 }
85 }
86 }
87 Ok(())
88 }
89}
90
91#[derive(Clone)]
93pub struct SwAgentService<E, S = Lmdb>
94where
95 S: Db + 'static,
96 E: EventHandler,
97{
98 rt: Arc<Mutex<Runtime<S>>>,
99 db: S,
100 writer: BorderlessId,
103 event_handler: E,
104}
105
106impl<S, E> SwAgentService<E, S>
107where
108 S: Db + 'static,
109 E: EventHandler,
110{
111 pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
112 Self {
113 rt: Arc::new(Mutex::new(rt)),
114 db,
115 writer,
116 event_handler,
117 }
118 }
119
120 pub fn with_shared(
121 db: S,
122 rt: Arc<Mutex<Runtime<S>>>,
123 event_handler: E,
124 writer: BorderlessId,
125 ) -> Self {
126 Self {
127 rt,
128 db,
129 writer,
130 event_handler,
131 }
132 }
133
134 async fn process_rq(&self, req: Request) -> crate::Result<Response> {
135 let start = Instant::now();
136 let path = req.uri().path().to_string();
137 let result = match *req.method() {
138 Method::GET => self.process_get_rq(req).await,
139 Method::POST => self.process_post_rq(req).await,
140 _ => Ok(method_not_allowed()),
141 };
142 let elapsed = start.elapsed();
143 match &result {
145 Ok(res) => info!(
146 "Request success. path={path}. Time elapsed: {elapsed:?}, status={}",
147 res.status()
148 ),
149 Err(e) => warn!("Request failed. path={path}. Time elapsed: {elapsed:?}, error={e}"),
150 }
151 result
152 }
153
154 async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
155 let path = req.uri().path();
156 let query = req.uri().query();
157
158 if path == "/" {
159 let agents = self.rt.lock().await.available_agents()?;
160 return Ok(json_response(&agents));
161 }
162
163 let mut pieces = path.split('/').skip(1);
164
165 let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
167 Some(aid) => aid,
168 None => return Ok(reject_404()),
169 };
170 let controller = Controller::new(&self.db);
171
172 if !controller.agent_exists(&agent_id)? {
174 return Ok(reject_404());
175 }
176
177 let route = match pieces.next() {
179 Some(r) => r,
180 None => {
181 let full_info = controller.agent_full(&agent_id)?;
183 return Ok(json_response(&full_info));
184 }
185 };
186
187 let mut trunc = String::new();
189 for piece in pieces {
190 trunc.push('/');
191 trunc.push_str(piece);
192 }
193 if trunc.is_empty() {
194 trunc.push('/');
195 }
196 if let Some(query) = query {
197 trunc.push('?');
198 trunc.push_str(query);
199 }
200 match route {
201 "state" => {
202 let mut rt = self.rt.lock().await;
204 let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
205 if status == 200 {
206 Ok(json_body(payload))
207 } else {
208 Ok(reject_404())
209 }
210 }
211 "logs" => {
212 let pagination = Pagination::from_query(query).unwrap_or_default();
214
215 let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
217
218 Ok(json_response(&log))
219 }
220 "sinks" => {
221 let sinks = controller.agent_sinks(&agent_id)?;
222 Ok(json_response(&sinks))
223 }
224 "subs" => {
225 let subs = controller.agent_subs(&agent_id)?;
226 Ok(json_response(&subs))
227 }
228 "desc" => {
229 let desc = controller.agent_desc(&agent_id)?;
230 Ok(json_response_nested(desc, &trunc))
231 }
232 "meta" => {
233 let meta = controller.agent_meta(&agent_id)?;
234 Ok(json_response_nested(meta, &trunc))
235 }
236 "symbols" => {
237 let mut rt = self.rt.lock().await;
238 let symbols = rt.get_symbols(&agent_id).await?;
239 Ok(json_response_nested(symbols, &trunc))
240 }
241 "pkg" => match trunc.as_str() {
242 "/" => {
243 let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
244 Ok(json_response(&result))
245 }
246 "/def" => {
247 let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
248 Ok(json_response(&result))
249 }
250 "/source" => {
251 let result = controller.agent_pkg_source(&agent_id)?;
252 Ok(json_response(&result))
253 }
254 _ => Ok(reject_404()),
255 },
256 "" => {
258 let full_info = controller.agent_full(&agent_id)?;
259 Ok(json_response(&full_info))
260 }
261 _ => Ok(reject_404()),
262 }
263 }
264
265 async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
266 let path = req.uri().path();
267
268 if path == "/" {
269 return Ok(method_not_allowed());
270 }
271
272 let mut pieces = path.split('/').skip(1);
273
274 let aid_str = match pieces.next() {
276 Some(s) => s,
277 None => return Ok(method_not_allowed()),
278 };
279 let agent_id: AgentId = match aid_str.parse() {
280 Ok(aid) => aid,
281 Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
282 };
283
284 let route = match pieces.next() {
286 Some(r) => r,
287 None => return Ok(reject_404()),
288 };
289
290 let mut trunc = String::new();
292 let mut cnt = 0;
293 for piece in pieces {
294 trunc.push('/');
295 trunc.push_str(piece);
296 cnt += 1;
297 }
298 if cnt > 1 {
300 return Ok(reject_404());
301 }
302 if trunc.is_empty() {
303 trunc.push('/');
304 }
305 if let Some(query) = req.uri().query() {
306 trunc.push('?');
307 trunc.push_str(query);
308 }
309 match route {
310 "action" => {
311 let (parts, payload) = req.into_parts();
313 if !check_json_content(&parts) {
314 return Ok(unsupported_media_type());
315 }
316 let (events, action) = {
317 let mut rt = self.rt.lock().await;
318 rt.set_executor(self.writer)?; match rt
320 .http_post_action(&agent_id, trunc, payload.into(), &self.writer)
321 .await?
322 {
323 Ok(out) => out,
324 Err((status, err)) => {
325 return Ok(err_response(status.try_into().unwrap(), err))
326 }
327 }
328 };
329 match self.event_handler.handle_events(events.clone()).await {
331 Ok(_) => {
332 let resp = ActionResp { events, action };
334 Ok(json_response(&resp))
335 }
336 Err(e) => Ok(into_server_error(e)),
337 }
338 }
339 "subscribe" => {
340 let (parts, payload) = req.into_parts();
342 if !check_json_content(&parts) {
343 return Ok(unsupported_media_type());
344 }
345 let payload: Vec<u8> = payload.into();
347 let dto = match TopicDto::from_bytes(payload.as_slice()) {
348 Ok(dto) => dto,
349 Err(e) => return Ok(bad_request(format!("failed to parse topic - {e}"))),
350 };
351 let topic = Topic::from(dto);
352 if !topic.validate() {
354 return Ok(bad_request("topic contains invalid characters".to_string()));
355 }
356 Controller::new(&self.db)
358 .messages()
359 .subscribe(agent_id, topic)
360 .expect("Handle error");
361 Ok(json_response(&json!({"success": true})))
362 }
363 "unsubscribe" => {
364 let (parts, payload) = req.into_parts();
366 if !check_json_content(&parts) {
367 return Ok(unsupported_media_type());
368 }
369 let payload: Vec<u8> = payload.into();
371 let dto = match TopicDto::from_bytes(payload.as_slice()) {
372 Ok(dto) => dto,
373 Err(e) => return Ok(bad_request(format!("failed to parse topic - {e}"))),
374 };
375 let topic = Topic::from(dto);
376 Controller::new(&self.db)
378 .messages()
379 .unsubscribe(agent_id, topic)
380 .expect("Handle error");
381 Ok(json_response(&json!({"success": true})))
382 }
383 "" => Ok(method_not_allowed()),
384 _ => Ok(reject_404()),
385 }
386 }
387}
388
389impl<E, S> Service<Request> for SwAgentService<E, S>
390where
391 S: Db + 'static,
392 E: EventHandler + 'static,
393{
394 type Response = Response;
395 type Error = Infallible;
396 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
397
398 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
399 Poll::Ready(Ok(()))
400 }
401
402 fn call(&mut self, req: Request) -> Self::Future {
403 let this = self.clone();
404 let fut = async move {
405 let result: Response = match this.process_rq(req).await {
406 Ok(r) => r,
407 Err(e) => into_server_error(e),
408 };
409 Ok(result)
410 };
411 Box::pin(fut)
412 }
413}