borderless_runtime/http/
agent.rs

1use borderless::{
2    events::{AgentCall, CallAction, Events},
3    http::queries::Pagination,
4    AgentId, BorderlessId,
5};
6use borderless_kv_store::{backend::lmdb::Lmdb, Db};
7use http::method::Method;
8use std::collections::VecDeque;
9use std::convert::Infallible;
10use std::future::Future;
11use std::{
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15    time::Instant,
16};
17use tokio::sync::Mutex;
18
19pub use super::*;
20use crate::log_shim::*;
21use crate::{db::controller::Controller, rt::agent::Runtime};
22
23#[derive(Serialize)]
24pub struct ActionResp {
25    pub events: Events,
26    pub action: CallAction,
27}
28
29pub trait EventHandler: Clone + Send + Sync {
30    type Error: std::fmt::Display + Send + Sync;
31
32    fn handle_events(&self, events: Events)
33        -> impl Future<Output = Result<(), Self::Error>> + Send;
34}
35
36/// A dummy implementation of an event-handler, that does nothing with the events.
37///
38/// Useful for testing.
39#[derive(Clone)]
40pub struct NoEventHandler;
41
42impl EventHandler for NoEventHandler {
43    type Error = Infallible;
44
45    async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
46        Ok(())
47    }
48}
49
50/// A dummy implementation of an event-handler, that immediately applies all agent events
51///
52/// Discards all contract events in the process.
53///
54/// Useful for testing.
55#[derive(Clone)]
56pub struct RecursiveEventHandler<S: Db> {
57    pub rt: Arc<Mutex<Runtime<S>>>,
58}
59
60impl<S: Db> EventHandler for RecursiveEventHandler<S> {
61    type Error = crate::Error;
62
63    async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
64        let mut rt = self.rt.lock().await;
65        let mut agent_events = VecDeque::with_capacity(events.local.len());
66        agent_events.extend(events.local);
67
68        // Handle events one by one
69        while let Some(AgentCall { agent_id, action }) = agent_events.pop_front() {
70            // Queue all process events and apply them again
71            if let Some(events) = rt.process_action(&agent_id, action).await? {
72                agent_events.extend(events.local);
73            }
74        }
75        Ok(())
76    }
77}
78
79/// Simple service around the runtime
80#[derive(Clone)]
81pub struct SwAgentService<E, S = Lmdb>
82where
83    S: Db + 'static,
84    E: EventHandler,
85{
86    rt: Arc<Mutex<Runtime<S>>>,
87    db: S,
88    // TODO: This is not optimal. The runtime is not tied to a tx-writer,
89    // and for our multi-tenant contract-node we require this to be flexible.
90    writer: BorderlessId,
91    event_handler: E,
92}
93
94impl<S, E> SwAgentService<E, S>
95where
96    S: Db + 'static,
97    E: EventHandler,
98{
99    pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
100        Self {
101            rt: Arc::new(Mutex::new(rt)),
102            db,
103            writer,
104            event_handler,
105        }
106    }
107
108    pub fn with_shared(
109        db: S,
110        rt: Arc<Mutex<Runtime<S>>>,
111        event_handler: E,
112        writer: BorderlessId,
113    ) -> Self {
114        Self {
115            rt,
116            db,
117            writer,
118            event_handler,
119        }
120    }
121
122    async fn process_rq(&self, req: Request) -> crate::Result<Response> {
123        let start = Instant::now();
124        let path = req.uri().path().to_string();
125        let result = match *req.method() {
126            Method::GET => self.process_get_rq(req).await,
127            Method::POST => self.process_post_rq(req).await,
128            _ => Ok(method_not_allowed()),
129        };
130        let elapsed = start.elapsed();
131        info!("Finished executing request. path={path}. Time elapsed: {elapsed:?}");
132        result
133    }
134
135    async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
136        let path = req.uri().path();
137        let query = req.uri().query();
138
139        if path == "/" {
140            let agents = self.rt.lock().await.available_agents()?;
141            return Ok(json_response(&agents));
142        }
143
144        let mut pieces = path.split('/').skip(1);
145
146        // Extract agent-id from first piece
147        let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
148            Some(aid) => aid,
149            None => return Ok(reject_404()),
150        };
151        let controller = Controller::new(&self.db);
152
153        // Ensure, that the agent exists
154        if !controller.agent_exists(&agent_id)? {
155            return Ok(reject_404());
156        }
157
158        // Get top-level route
159        let route = match pieces.next() {
160            Some(r) => r,
161            None => {
162                // Get full agent info
163                let full_info = controller.agent_full(&agent_id)?;
164                return Ok(json_response(&full_info));
165            }
166        };
167
168        // Build truncated path
169        let mut trunc = String::new();
170        for piece in pieces {
171            trunc.push('/');
172            trunc.push_str(piece);
173        }
174        if trunc.is_empty() {
175            trunc.push('/');
176        }
177        if let Some(query) = query {
178            trunc.push('?');
179            trunc.push_str(query);
180        }
181        match route {
182            "state" => {
183                // TODO: The agent should also parse query parameters !
184                let mut rt = self.rt.lock().await;
185                let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
186                if status == 200 {
187                    Ok(json_body(payload))
188                } else {
189                    Ok(reject_404())
190                }
191            }
192            "logs" => {
193                // Extract pagination
194                let pagination = Pagination::from_query(query).unwrap_or_default();
195
196                // Get logs
197                let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
198
199                Ok(json_response(&log))
200            }
201            "sinks" => {
202                let sinks = controller.agent_sinks(&agent_id)?;
203                Ok(json_response(&sinks))
204            }
205            "desc" => {
206                let desc = controller.agent_desc(&agent_id)?;
207                Ok(json_response_nested(desc, &trunc))
208            }
209            "meta" => {
210                let meta = controller.agent_meta(&agent_id)?;
211                Ok(json_response_nested(meta, &trunc))
212            }
213            "symbols" => {
214                let mut rt = self.rt.lock().await;
215                let symbols = rt.get_symbols(&agent_id).await?;
216                Ok(json_response_nested(symbols, &trunc))
217            }
218            "pkg" => match trunc.as_str() {
219                "/" => {
220                    let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
221                    Ok(json_response(&result))
222                }
223                "/def" => {
224                    let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
225                    Ok(json_response(&result))
226                }
227                "/source" => {
228                    let result = controller.agent_pkg_source(&agent_id)?;
229                    Ok(json_response(&result))
230                }
231                _ => Ok(reject_404()),
232            },
233            // Same as empty path
234            "" => {
235                let full_info = controller.agent_full(&agent_id)?;
236                Ok(json_response(&full_info))
237            }
238            _ => Ok(reject_404()),
239        }
240    }
241
242    async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
243        let path = req.uri().path();
244
245        if path == "/" {
246            return Ok(method_not_allowed());
247        }
248
249        let mut pieces = path.split('/').skip(1);
250
251        // Extract agent-id from first piece
252        let aid_str = match pieces.next() {
253            Some(s) => s,
254            None => return Ok(method_not_allowed()),
255        };
256        let agent_id: AgentId = match aid_str.parse() {
257            Ok(aid) => aid,
258            Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
259        };
260
261        // Get top-level route
262        let route = match pieces.next() {
263            Some(r) => r,
264            None => return Ok(reject_404()),
265        };
266
267        // Build truncated path
268        let mut trunc = String::new();
269        let mut cnt = 0;
270        for piece in pieces {
271            trunc.push('/');
272            trunc.push_str(piece);
273            cnt += 1;
274        }
275        // NOTE: The action route only has one additional path parameter
276        if cnt > 1 {
277            return Ok(reject_404());
278        }
279        if trunc.is_empty() {
280            trunc.push('/');
281        }
282        if let Some(query) = req.uri().query() {
283            trunc.push('?');
284            trunc.push_str(query);
285        }
286        match route {
287            "action" => {
288                // Check request header
289                let (parts, payload) = req.into_parts();
290                if !check_json_content(&parts) {
291                    return Ok(unsupported_media_type());
292                }
293                let (events, action) = {
294                    let mut rt = self.rt.lock().await;
295                    rt.set_executor(self.writer)?; // For agents the executor and the writer are actually the same
296                    match rt
297                        .http_post_action(&agent_id, trunc, payload.into(), &self.writer)
298                        .await?
299                    {
300                        Ok(out) => out,
301                        Err((status, err)) => {
302                            return Ok(err_response(status.try_into().unwrap(), err))
303                        }
304                    }
305                };
306                // Forward the events
307                match self.event_handler.handle_events(events.clone()).await {
308                    Ok(_) => {
309                        // Build action response
310                        let resp = ActionResp { events, action };
311                        Ok(json_response(&resp))
312                    }
313                    Err(e) => Ok(into_server_error(e)),
314                }
315            }
316            "" => Ok(method_not_allowed()),
317            _ => Ok(reject_404()),
318        }
319    }
320}
321
322impl<E, S> Service<Request> for SwAgentService<E, S>
323where
324    S: Db + 'static,
325    E: EventHandler + 'static,
326{
327    type Response = Response;
328    type Error = Infallible;
329    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
330
331    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
332        Poll::Ready(Ok(()))
333    }
334
335    fn call(&mut self, req: Request) -> Self::Future {
336        let this = self.clone();
337        let fut = async move {
338            let result: Response = match this.process_rq(req).await {
339                Ok(r) => r,
340                Err(e) => into_server_error(e),
341            };
342            Ok(result)
343        };
344        Box::pin(fut)
345    }
346}