pub use super::*;
use crate::log_shim::*;
use crate::{db::controller::Controller, rt::agent::Runtime};
use borderless::events::Message;
use borderless::{
events::{CallAction, Events},
http::queries::Pagination,
AgentId, BorderlessId,
};
use borderless_kv_store::{backend::lmdb::Lmdb, Db};
use http::method::Method;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::future::Future;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tokio::sync::Mutex;
#[derive(Serialize)]
pub struct ActionResp {
pub events: Events,
pub action: CallAction,
}
pub trait EventHandler: Clone + Send + Sync {
type Error: std::fmt::Display + Send + Sync;
fn handle_events(&self, events: Events)
-> impl Future<Output = Result<(), Self::Error>> + Send;
}
#[derive(Clone)]
pub struct NoEventHandler;
impl EventHandler for NoEventHandler {
type Error = Infallible;
async fn handle_events(&self, _events: Events) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Clone)]
pub struct RecursiveEventHandler<S: Db> {
pub rt: Arc<Mutex<Runtime<S>>>,
}
impl<S: Db> EventHandler for RecursiveEventHandler<S> {
type Error = crate::Error;
async fn handle_events(&self, events: Events) -> Result<(), Self::Error> {
let mut rt = self.rt.lock().await;
let db = rt.get_db();
let sub_handler = Controller::new(&db).messages();
let mut agent_events: VecDeque<_> = events.local.into();
while let Some(Message {
publisher,
topic,
value,
}) = agent_events.pop_front()
{
let subscribers = sub_handler.get_topic_subscribers(publisher, topic)?;
for (subscriber, method) in subscribers {
let action = CallAction::by_method(method, value.clone());
if let Some(events) = rt.process_action(&subscriber, action).await? {
agent_events.extend(events.local);
}
}
}
Ok(())
}
}
#[derive(Clone)]
pub struct SwAgentService<E, S = Lmdb>
where
S: Db + 'static,
E: EventHandler,
{
rt: Arc<Mutex<Runtime<S>>>,
db: S,
writer: BorderlessId,
event_handler: E,
}
impl<S, E> SwAgentService<E, S>
where
S: Db + 'static,
E: EventHandler,
{
pub fn new(db: S, rt: Runtime<S>, writer: BorderlessId, event_handler: E) -> Self {
Self {
rt: Arc::new(Mutex::new(rt)),
db,
writer,
event_handler,
}
}
pub fn with_shared(
db: S,
rt: Arc<Mutex<Runtime<S>>>,
event_handler: E,
writer: BorderlessId,
) -> Self {
Self {
rt,
db,
writer,
event_handler,
}
}
async fn process_rq(&self, req: Request) -> crate::Result<Response> {
let start = Instant::now();
let path = req.uri().path().to_string();
let result = match *req.method() {
Method::GET => self.process_get_rq(req).await,
Method::POST => self.process_post_rq(req).await,
_ => Ok(method_not_allowed()),
};
let elapsed = start.elapsed();
match &result {
Ok(res) => info!(
"Request success. path={path}. Time elapsed: {elapsed:?}, status={}",
res.status()
),
Err(e) => warn!("Request failed. path={path}. Time elapsed: {elapsed:?}, error={e}"),
}
result
}
async fn process_get_rq(&self, req: Request) -> crate::Result<Response> {
let path = req.uri().path();
let query = req.uri().query();
if path == "/" {
let agents = self.rt.lock().await.available_agents()?;
return Ok(json_response(&agents));
}
let mut pieces = path.split('/').skip(1);
let agent_id: AgentId = match pieces.next().and_then(|first| first.parse().ok()) {
Some(aid) => aid,
None => return Ok(reject_404()),
};
let controller = Controller::new(&self.db);
if !controller.agent_exists(&agent_id)? {
return Ok(reject_404());
}
let route = match pieces.next() {
Some(r) => r,
None => {
let full_info = controller.agent_full(&agent_id)?;
return Ok(json_response(&full_info));
}
};
let mut trunc = String::new();
for piece in pieces {
trunc.push('/');
trunc.push_str(piece);
}
if trunc.is_empty() {
trunc.push('/');
}
if let Some(query) = query {
trunc.push('?');
trunc.push_str(query);
}
match route {
"state" => {
let mut rt = self.rt.lock().await;
let (status, payload) = rt.http_get_state(&agent_id, trunc).await?;
if status == 200 {
Ok(json_body(payload))
} else {
Ok(reject_404())
}
}
"logs" => {
let pagination = Pagination::from_query(query).unwrap_or_default();
let log = controller.logs(agent_id).get_logs_paginated(pagination)?;
Ok(json_response(&log))
}
"sinks" => {
let sinks = controller.agent_sinks(&agent_id)?;
Ok(json_response(&sinks))
}
"subs" => {
let subs = controller.agent_subs(&agent_id)?;
Ok(json_response(&subs))
}
"desc" => {
let desc = controller.agent_desc(&agent_id)?;
Ok(json_response_nested(desc, &trunc))
}
"meta" => {
let meta = controller.agent_meta(&agent_id)?;
Ok(json_response_nested(meta, &trunc))
}
"symbols" => {
let mut rt = self.rt.lock().await;
let symbols = rt.get_symbols(&agent_id).await?;
Ok(json_response_nested(symbols, &trunc))
}
"pkg" => match trunc.as_str() {
"/" => {
let result = controller.agent_pkg_full(&agent_id)?.map(|r| r.into_dto());
Ok(json_response(&result))
}
"/def" => {
let result = controller.agent_pkg_def(&agent_id)?.map(|r| r.into_dto());
Ok(json_response(&result))
}
"/source" => {
let result = controller.agent_pkg_source(&agent_id)?;
Ok(json_response(&result))
}
_ => Ok(reject_404()),
},
"" => {
let full_info = controller.agent_full(&agent_id)?;
Ok(json_response(&full_info))
}
_ => Ok(reject_404()),
}
}
async fn process_post_rq(&self, req: Request) -> crate::Result<Response> {
let path = req.uri().path();
if path == "/" {
return Ok(method_not_allowed());
}
let mut pieces = path.split('/').skip(1);
let aid_str = match pieces.next() {
Some(s) => s,
None => return Ok(method_not_allowed()),
};
let agent_id: AgentId = match aid_str.parse() {
Ok(aid) => aid,
Err(e) => return Ok(bad_request(format!("failed to parse agent-id - {e}"))),
};
let route = match pieces.next() {
Some(r) => r,
None => return Ok(reject_404()),
};
let mut trunc = String::new();
let mut cnt = 0;
for piece in pieces {
trunc.push('/');
trunc.push_str(piece);
cnt += 1;
}
if cnt > 1 {
return Ok(reject_404());
}
if trunc.is_empty() {
trunc.push('/');
}
if let Some(query) = req.uri().query() {
trunc.push('?');
trunc.push_str(query);
}
match route {
"action" => {
let (parts, payload) = req.into_parts();
if !check_json_content(&parts) {
return Ok(unsupported_media_type());
}
let (events, action) = {
let mut rt = self.rt.lock().await;
rt.set_executor(self.writer)?; match rt
.http_post_action(&agent_id, trunc, payload.into(), &self.writer)
.await?
{
Ok(out) => out,
Err((status, err)) => {
return Ok(err_response(status.try_into().unwrap(), err))
}
}
};
match self.event_handler.handle_events(events.clone()).await {
Ok(_) => {
let resp = ActionResp { events, action };
Ok(json_response(&resp))
}
Err(e) => Ok(into_server_error(e)),
}
}
"" => Ok(method_not_allowed()),
_ => Ok(reject_404()),
}
}
}
impl<E, S> Service<Request> for SwAgentService<E, S>
where
S: Db + 'static,
E: EventHandler + 'static,
{
type Response = Response;
type Error = Infallible;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
let this = self.clone();
let fut = async move {
let result: Response = match this.process_rq(req).await {
Ok(r) => r,
Err(e) => into_server_error(e),
};
Ok(result)
};
Box::pin(fut)
}
}