Skip to main content

borderless_runtime/http/
agent.rs

1pub use super::*;
2use crate::log_shim::*;
3use crate::{db::controller::Controller, rt::agent::Runtime};
4use borderless::events::{Message, Topic, TopicDto};
5use borderless::{
6    events::{CallAction, Events},
7    http::queries::Pagination,
8    AgentId, BorderlessId,
9};
10use borderless_kv_store::{backend::lmdb::Lmdb, Db};
11use http::method::Method;
12use serde_json::json;
13use std::collections::VecDeque;
14use std::convert::Infallible;
15use std::future::Future;
16use std::{
17    pin::Pin,
18    sync::Arc,
19    task::{Context, Poll},
20    time::Instant,
21};
22use tokio::sync::Mutex;
23
24#[derive(Serialize)]
25pub struct ActionResp {
26    pub events: Events,
27    pub action: CallAction,
28}
29
30pub trait EventHandler: Clone + Send + Sync {
31    type Error: std::fmt::Display + Send + Sync;
32
33    fn handle_events(&self, events: Events)
34        -> impl Future<Output = Result<(), Self::Error>> + Send;
35}
36
37/// A dummy implementation of an event-handler, that does nothing with the events.
38///
39/// Useful for testing.
40#[derive(Clone)]
41pub struct NoEventHandler;
42
43impl EventHandler for NoEventHandler {
44    type Error = Infallible;
45
46    async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
47        Ok(())
48    }
49}
50
51/// A dummy implementation of an event-handler, that immediately applies all agent events
52///
53/// Discards all contract events in the process.
54///
55/// Useful for testing.
56#[derive(Clone)]
57pub struct RecursiveEventHandler<S: Db> {
58    pub rt: Arc<Mutex<Runtime<S>>>,
59}
60
61impl<S: Db> EventHandler for RecursiveEventHandler<S> {
62    type Error = crate::Error;
63
64    async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
65        let mut rt = self.rt.lock().await;
66        let db = rt.get_db();
67        let sub_handler = Controller::new(&db).messages();
68
69        let mut agent_events: VecDeque<_> = events.local.into();
70        // Handle events one by one
71        while let Some(Message {
72            publisher,
73            topic,
74            value,
75        }) = agent_events.pop_front()
76        {
77            let subscribers = sub_handler.get_topic_subscribers(publisher, topic)?;
78
79            for (subscriber, method) in subscribers {
80                let action = CallAction::by_method(method, value.clone());
81                // Queue all process events and apply them again
82                if let Some(events) = rt.process_action(&subscriber, action).await? {
83                    agent_events.extend(events.local);
84                }
85            }
86        }
87        Ok(())
88    }
89}
90
91/// Simple service around the runtime
92#[derive(Clone)]
93pub struct SwAgentService<E, S = Lmdb>
94where
95    S: Db + 'static,
96    E: EventHandler,
97{
98    rt: Arc<Mutex<Runtime<S>>>,
99    db: S,
100    // TODO: This is not optimal. The runtime is not tied to a tx-writer,
101    // and for our multi-tenant contract-node we require this to be flexible.
102    writer: BorderlessId,
103    event_handler: E,
104}
105
106impl<S, E> SwAgentService<E, S>
107where
108    S: Db + 'static,
109    E: EventHandler,
110{
111    pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
112        Self {
113            rt: Arc::new(Mutex::new(rt)),
114            db,
115            writer,
116            event_handler,
117        }
118    }
119
120    pub fn with_shared(
121        db: S,
122        rt: Arc<Mutex<Runtime<S>>>,
123        event_handler: E,
124        writer: BorderlessId,
125    ) -> Self {
126        Self {
127            rt,
128            db,
129            writer,
130            event_handler,
131        }
132    }
133
134    async fn process_rq(&self, req: Request) -> crate::Result<Response> {
135        let start = Instant::now();
136        let path = req.uri().path().to_string();
137        let result = match *req.method() {
138            Method::GET => self.process_get_rq(req).await,
139            Method::POST => self.process_post_rq(req).await,
140            _ => Ok(method_not_allowed()),
141        };
142        let elapsed = start.elapsed();
143        // TODO: I don't know if this should be logged every time
144        match &result {
145            Ok(res) => info!(
146                "Request success. path={path}. Time elapsed: {elapsed:?}, status={}",
147                res.status()
148            ),
149            Err(e) => warn!("Request failed. path={path}. Time elapsed: {elapsed:?}, error={e}"),
150        }
151        result
152    }
153
154    async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
155        let path = req.uri().path();
156        let query = req.uri().query();
157
158        if path == "/" {
159            let agents = self.rt.lock().await.available_agents()?;
160            return Ok(json_response(&agents));
161        }
162
163        let mut pieces = path.split('/').skip(1);
164
165        // Extract agent-id from first piece
166        let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
167            Some(aid) => aid,
168            None => return Ok(reject_404()),
169        };
170        let controller = Controller::new(&self.db);
171
172        // Ensure, that the agent exists
173        if !controller.agent_exists(&agent_id)? {
174            return Ok(reject_404());
175        }
176
177        // Get top-level route
178        let route = match pieces.next() {
179            Some(r) => r,
180            None => {
181                // Get full agent info
182                let full_info = controller.agent_full(&agent_id)?;
183                return Ok(json_response(&full_info));
184            }
185        };
186
187        // Build truncated path
188        let mut trunc = String::new();
189        for piece in pieces {
190            trunc.push('/');
191            trunc.push_str(piece);
192        }
193        if trunc.is_empty() {
194            trunc.push('/');
195        }
196        if let Some(query) = query {
197            trunc.push('?');
198            trunc.push_str(query);
199        }
200        match route {
201            "state" => {
202                // TODO: The agent should also parse query parameters !
203                let mut rt = self.rt.lock().await;
204                let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
205                if status == 200 {
206                    Ok(json_body(payload))
207                } else {
208                    Ok(reject_404())
209                }
210            }
211            "logs" => {
212                // Extract pagination
213                let pagination = Pagination::from_query(query).unwrap_or_default();
214
215                // Get logs
216                let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
217
218                Ok(json_response(&log))
219            }
220            "sinks" => {
221                let sinks = controller.agent_sinks(&agent_id)?;
222                Ok(json_response(&sinks))
223            }
224            "subs" => {
225                let subs = controller.agent_subs(&agent_id)?;
226                Ok(json_response(&subs))
227            }
228            "desc" => {
229                let desc = controller.agent_desc(&agent_id)?;
230                Ok(json_response_nested(desc, &trunc))
231            }
232            "meta" => {
233                let meta = controller.agent_meta(&agent_id)?;
234                Ok(json_response_nested(meta, &trunc))
235            }
236            "symbols" => {
237                let mut rt = self.rt.lock().await;
238                let symbols = rt.get_symbols(&agent_id).await?;
239                Ok(json_response_nested(symbols, &trunc))
240            }
241            "pkg" => match trunc.as_str() {
242                "/" => {
243                    let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
244                    Ok(json_response(&result))
245                }
246                "/def" => {
247                    let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
248                    Ok(json_response(&result))
249                }
250                "/source" => {
251                    let result = controller.agent_pkg_source(&agent_id)?;
252                    Ok(json_response(&result))
253                }
254                _ => Ok(reject_404()),
255            },
256            // Same as empty path
257            "" => {
258                let full_info = controller.agent_full(&agent_id)?;
259                Ok(json_response(&full_info))
260            }
261            _ => Ok(reject_404()),
262        }
263    }
264
265    async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
266        let path = req.uri().path();
267
268        if path == "/" {
269            return Ok(method_not_allowed());
270        }
271
272        let mut pieces = path.split('/').skip(1);
273
274        // Extract agent-id from first piece
275        let aid_str = match pieces.next() {
276            Some(s) => s,
277            None => return Ok(method_not_allowed()),
278        };
279        let agent_id: AgentId = match aid_str.parse() {
280            Ok(aid) => aid,
281            Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
282        };
283
284        // Get top-level route
285        let route = match pieces.next() {
286            Some(r) => r,
287            None => return Ok(reject_404()),
288        };
289
290        // Build truncated path
291        let mut trunc = String::new();
292        let mut cnt = 0;
293        for piece in pieces {
294            trunc.push('/');
295            trunc.push_str(piece);
296            cnt += 1;
297        }
298        // NOTE: The action route only has one additional path parameter
299        if cnt > 1 {
300            return Ok(reject_404());
301        }
302        if trunc.is_empty() {
303            trunc.push('/');
304        }
305        if let Some(query) = req.uri().query() {
306            trunc.push('?');
307            trunc.push_str(query);
308        }
309        match route {
310            "action" => {
311                // Check request header
312                let (parts, payload) = req.into_parts();
313                if !check_json_content(&parts) {
314                    return Ok(unsupported_media_type());
315                }
316                let (events, action) = {
317                    let mut rt = self.rt.lock().await;
318                    rt.set_executor(self.writer)?; // For agents the executor and the writer are actually the same
319                    match rt
320                        .http_post_action(&agent_id, trunc, payload.into(), &self.writer)
321                        .await?
322                    {
323                        Ok(out) => out,
324                        Err((status, err)) => {
325                            return Ok(err_response(status.try_into().unwrap(), err))
326                        }
327                    }
328                };
329                // Forward the events
330                match self.event_handler.handle_events(events.clone()).await {
331                    Ok(_) => {
332                        // Build action response
333                        let resp = ActionResp { events, action };
334                        Ok(json_response(&resp))
335                    }
336                    Err(e) => Ok(into_server_error(e)),
337                }
338            }
339            "subscribe" => {
340                // Check request header
341                let (parts, payload) = req.into_parts();
342                if !check_json_content(&parts) {
343                    return Ok(unsupported_media_type());
344                }
345                // Extract Topic from request
346                let payload: Vec<u8> = payload.into();
347                let dto = match TopicDto::from_bytes(payload.as_slice()) {
348                    Ok(dto) => dto,
349                    Err(e) => return Ok(bad_request(format!("failed to parse topic - {e}"))),
350                };
351                let topic = Topic::from(dto);
352                // Control that there are no newline characters in topic
353                if !topic.validate() {
354                    return Ok(bad_request("topic contains invalid characters".to_string()));
355                }
356                // Start subscription
357                Controller::new(&self.db)
358                    .messages()
359                    .subscribe(agent_id, topic)
360                    .expect("Handle error");
361                Ok(json_response(&json!({"success": true})))
362            }
363            "unsubscribe" => {
364                // Check request header
365                let (parts, payload) = req.into_parts();
366                if !check_json_content(&parts) {
367                    return Ok(unsupported_media_type());
368                }
369                // Extract Topic from request
370                let payload: Vec<u8> = payload.into();
371                let dto = match TopicDto::from_bytes(payload.as_slice()) {
372                    Ok(dto) => dto,
373                    Err(e) => return Ok(bad_request(format!("failed to parse topic - {e}"))),
374                };
375                let topic = Topic::from(dto);
376                // Stop subscription
377                Controller::new(&self.db)
378                    .messages()
379                    .unsubscribe(agent_id, topic)
380                    .expect("Handle error");
381                Ok(json_response(&json!({"success": true})))
382            }
383            "" => Ok(method_not_allowed()),
384            _ => Ok(reject_404()),
385        }
386    }
387}
388
389impl<E, S> Service<Request> for SwAgentService<E, S>
390where
391    S: Db + 'static,
392    E: EventHandler + 'static,
393{
394    type Response = Response;
395    type Error = Infallible;
396    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
397
398    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
399        Poll::Ready(Ok(()))
400    }
401
402    fn call(&mut self, req: Request) -> Self::Future {
403        let this = self.clone();
404        let fut = async move {
405            let result: Response = match this.process_rq(req).await {
406                Ok(r) => r,
407                Err(e) => into_server_error(e),
408            };
409            Ok(result)
410        };
411        Box::pin(fut)
412    }
413}