borderless_runtime/http/
agent.rs

1pub use super::*;
2use crate::log_shim::*;
3use crate::{db::controller::Controller, rt::agent::Runtime};
4use borderless::events::Message;
5use borderless::{
6    events::{CallAction, Events},
7    http::queries::Pagination,
8    AgentId, BorderlessId,
9};
10use borderless_kv_store::{backend::lmdb::Lmdb, Db};
11use http::method::Method;
12use std::collections::VecDeque;
13use std::convert::Infallible;
14use std::future::Future;
15use std::{
16    pin::Pin,
17    sync::Arc,
18    task::{Context, Poll},
19    time::Instant,
20};
21use tokio::sync::Mutex;
22
23#[derive(Serialize)]
24pub struct ActionResp {
25    pub events: Events,
26    pub action: CallAction,
27}
28
29pub trait EventHandler: Clone + Send + Sync {
30    type Error: std::fmt::Display + Send + Sync;
31
32    fn handle_events(&self, events: Events)
33        -> impl Future<Output = Result<(), Self::Error>> + Send;
34}
35
36/// A dummy implementation of an event-handler, that does nothing with the events.
37///
38/// Useful for testing.
39#[derive(Clone)]
40pub struct NoEventHandler;
41
42impl EventHandler for NoEventHandler {
43    type Error = Infallible;
44
45    async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
46        Ok(())
47    }
48}
49
50/// A dummy implementation of an event-handler, that immediately applies all agent events
51///
52/// Discards all contract events in the process.
53///
54/// Useful for testing.
55#[derive(Clone)]
56pub struct RecursiveEventHandler<S: Db> {
57    pub rt: Arc<Mutex<Runtime<S>>>,
58}
59
60impl<S: Db> EventHandler for RecursiveEventHandler<S> {
61    type Error = crate::Error;
62
63    async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
64        let mut rt = self.rt.lock().await;
65        let db = rt.get_db();
66        let sub_handler = Controller::new(&db).messages();
67
68        let mut agent_events: VecDeque<_> = events.local.into();
69        // Handle events one by one
70        while let Some(Message {
71            publisher,
72            topic,
73            value,
74        }) = agent_events.pop_front()
75        {
76            let subscribers = sub_handler.get_topic_subscribers(publisher, topic)?;
77
78            for (subscriber, method) in subscribers {
79                let action = CallAction::by_method(method, value.clone());
80                // Queue all process events and apply them again
81                if let Some(events) = rt.process_action(&subscriber, action).await? {
82                    agent_events.extend(events.local);
83                }
84            }
85        }
86        Ok(())
87    }
88}
89
90/// Simple service around the runtime
91#[derive(Clone)]
92pub struct SwAgentService<E, S = Lmdb>
93where
94    S: Db + 'static,
95    E: EventHandler,
96{
97    rt: Arc<Mutex<Runtime<S>>>,
98    db: S,
99    // TODO: This is not optimal. The runtime is not tied to a tx-writer,
100    // and for our multi-tenant contract-node we require this to be flexible.
101    writer: BorderlessId,
102    event_handler: E,
103}
104
105impl<S, E> SwAgentService<E, S>
106where
107    S: Db + 'static,
108    E: EventHandler,
109{
110    pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
111        Self {
112            rt: Arc::new(Mutex::new(rt)),
113            db,
114            writer,
115            event_handler,
116        }
117    }
118
119    pub fn with_shared(
120        db: S,
121        rt: Arc<Mutex<Runtime<S>>>,
122        event_handler: E,
123        writer: BorderlessId,
124    ) -> Self {
125        Self {
126            rt,
127            db,
128            writer,
129            event_handler,
130        }
131    }
132
133    async fn process_rq(&self, req: Request) -> crate::Result<Response> {
134        let start = Instant::now();
135        let path = req.uri().path().to_string();
136        let result = match *req.method() {
137            Method::GET => self.process_get_rq(req).await,
138            Method::POST => self.process_post_rq(req).await,
139            _ => Ok(method_not_allowed()),
140        };
141        let elapsed = start.elapsed();
142        // TODO: I don't know if this should be logged every time
143        match &result {
144            Ok(res) => info!(
145                "Request success. path={path}. Time elapsed: {elapsed:?}, status={}",
146                res.status()
147            ),
148            Err(e) => warn!("Request failed. path={path}. Time elapsed: {elapsed:?}, error={e}"),
149        }
150        result
151    }
152
153    async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
154        let path = req.uri().path();
155        let query = req.uri().query();
156
157        if path == "/" {
158            let agents = self.rt.lock().await.available_agents()?;
159            return Ok(json_response(&agents));
160        }
161
162        let mut pieces = path.split('/').skip(1);
163
164        // Extract agent-id from first piece
165        let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
166            Some(aid) => aid,
167            None => return Ok(reject_404()),
168        };
169        let controller = Controller::new(&self.db);
170
171        // Ensure, that the agent exists
172        if !controller.agent_exists(&agent_id)? {
173            return Ok(reject_404());
174        }
175
176        // Get top-level route
177        let route = match pieces.next() {
178            Some(r) => r,
179            None => {
180                // Get full agent info
181                let full_info = controller.agent_full(&agent_id)?;
182                return Ok(json_response(&full_info));
183            }
184        };
185
186        // Build truncated path
187        let mut trunc = String::new();
188        for piece in pieces {
189            trunc.push('/');
190            trunc.push_str(piece);
191        }
192        if trunc.is_empty() {
193            trunc.push('/');
194        }
195        if let Some(query) = query {
196            trunc.push('?');
197            trunc.push_str(query);
198        }
199        match route {
200            "state" => {
201                // TODO: The agent should also parse query parameters !
202                let mut rt = self.rt.lock().await;
203                let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
204                if status == 200 {
205                    Ok(json_body(payload))
206                } else {
207                    Ok(reject_404())
208                }
209            }
210            "logs" => {
211                // Extract pagination
212                let pagination = Pagination::from_query(query).unwrap_or_default();
213
214                // Get logs
215                let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
216
217                Ok(json_response(&log))
218            }
219            "sinks" => {
220                let sinks = controller.agent_sinks(&agent_id)?;
221                Ok(json_response(&sinks))
222            }
223            "subs" => {
224                let subs = controller.agent_subs(&agent_id)?;
225                Ok(json_response(&subs))
226            }
227            "desc" => {
228                let desc = controller.agent_desc(&agent_id)?;
229                Ok(json_response_nested(desc, &trunc))
230            }
231            "meta" => {
232                let meta = controller.agent_meta(&agent_id)?;
233                Ok(json_response_nested(meta, &trunc))
234            }
235            "symbols" => {
236                let mut rt = self.rt.lock().await;
237                let symbols = rt.get_symbols(&agent_id).await?;
238                Ok(json_response_nested(symbols, &trunc))
239            }
240            "pkg" => match trunc.as_str() {
241                "/" => {
242                    let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
243                    Ok(json_response(&result))
244                }
245                "/def" => {
246                    let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
247                    Ok(json_response(&result))
248                }
249                "/source" => {
250                    let result = controller.agent_pkg_source(&agent_id)?;
251                    Ok(json_response(&result))
252                }
253                _ => Ok(reject_404()),
254            },
255            // Same as empty path
256            "" => {
257                let full_info = controller.agent_full(&agent_id)?;
258                Ok(json_response(&full_info))
259            }
260            _ => Ok(reject_404()),
261        }
262    }
263
264    async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
265        let path = req.uri().path();
266
267        if path == "/" {
268            return Ok(method_not_allowed());
269        }
270
271        let mut pieces = path.split('/').skip(1);
272
273        // Extract agent-id from first piece
274        let aid_str = match pieces.next() {
275            Some(s) => s,
276            None => return Ok(method_not_allowed()),
277        };
278        let agent_id: AgentId = match aid_str.parse() {
279            Ok(aid) => aid,
280            Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
281        };
282
283        // Get top-level route
284        let route = match pieces.next() {
285            Some(r) => r,
286            None => return Ok(reject_404()),
287        };
288
289        // Build truncated path
290        let mut trunc = String::new();
291        let mut cnt = 0;
292        for piece in pieces {
293            trunc.push('/');
294            trunc.push_str(piece);
295            cnt += 1;
296        }
297        // NOTE: The action route only has one additional path parameter
298        if cnt > 1 {
299            return Ok(reject_404());
300        }
301        if trunc.is_empty() {
302            trunc.push('/');
303        }
304        if let Some(query) = req.uri().query() {
305            trunc.push('?');
306            trunc.push_str(query);
307        }
308        match route {
309            "action" => {
310                // Check request header
311                let (parts, payload) = req.into_parts();
312                if !check_json_content(&parts) {
313                    return Ok(unsupported_media_type());
314                }
315                let (events, action) = {
316                    let mut rt = self.rt.lock().await;
317                    rt.set_executor(self.writer)?; // For agents the executor and the writer are actually the same
318                    match rt
319                        .http_post_action(&agent_id, trunc, payload.into(), &self.writer)
320                        .await?
321                    {
322                        Ok(out) => out,
323                        Err((status, err)) => {
324                            return Ok(err_response(status.try_into().unwrap(), err))
325                        }
326                    }
327                };
328                // Forward the events
329                match self.event_handler.handle_events(events.clone()).await {
330                    Ok(_) => {
331                        // Build action response
332                        let resp = ActionResp { events, action };
333                        Ok(json_response(&resp))
334                    }
335                    Err(e) => Ok(into_server_error(e)),
336                }
337            }
338            "" => Ok(method_not_allowed()),
339            _ => Ok(reject_404()),
340        }
341    }
342}
343
344impl<E, S> Service<Request> for SwAgentService<E, S>
345where
346    S: Db + 'static,
347    E: EventHandler + 'static,
348{
349    type Response = Response;
350    type Error = Infallible;
351    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
352
353    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
354        Poll::Ready(Ok(()))
355    }
356
357    fn call(&mut self, req: Request) -> Self::Future {
358        let this = self.clone();
359        let fut = async move {
360            let result: Response = match this.process_rq(req).await {
361                Ok(r) => r,
362                Err(e) => into_server_error(e),
363            };
364            Ok(result)
365        };
366        Box::pin(fut)
367    }
368}