mod ans;
mod auth;
mod bearer_token;
mod blob;
mod events;
mod files;
mod filters;
mod hyper_serve;
pub mod licensing;
pub(crate) mod macros;
mod node;
mod rejections;
#[cfg(test)]
mod tests;
pub use crate::api::events::service::EventService;
use crate::{
api::{files::FilePinner, hyper_serve::serve_it, licensing::Licensing},
ax_panic, balanced_or,
crypto::{KeyStoreRef, PublicKey},
swarm::{blob_store::BlobStore, event_store_ref::EventStoreRef, BanyanStore},
util::{
formats::{NodeCycleCount, NodeErrorContext},
to_multiaddr,
variable::Reader,
SocketAddrHelper,
},
};
use ax_types::{service::SwarmState, NodeId};
use chrono::{DateTime, Utc};
use crossbeam::channel::Sender;
use futures::future::try_join_all;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{fmt, sync::Arc};
use warp::{cors, path, Filter, Rejection, Reply};
#[derive(Clone)]
pub struct NodeInfo {
pub node_id: NodeId,
pub key_store: KeyStoreRef,
pub token_validity: u32,
pub cycles: NodeCycleCount,
pub ax_public_key: PublicKey,
pub licensing: Licensing,
pub started_at: DateTime<Utc>,
}
impl NodeInfo {
pub fn new(
node_id: NodeId,
key_store: KeyStoreRef,
cycles: NodeCycleCount,
licensing: Licensing,
started_at: DateTime<Utc>,
) -> Self {
Self {
node_id,
key_store,
cycles,
token_validity: 86400,
ax_public_key: PublicKey::ax_public_key(),
licensing,
started_at,
}
}
}
#[derive(Debug, derive_more::Display)]
pub struct Error(anyhow::Error); impl std::error::Error for Error {}
impl warp::reject::Reject for Error {}
pub fn reject(err: anyhow::Error) -> Rejection {
warp::reject::custom(Error(err))
}
pub type Result<T> = std::result::Result<T, Rejection>;
#[derive(Debug, Deserialize, derive_more::Display)]
pub struct Token(pub(crate) String);
impl From<String> for Token {
fn from(x: String) -> Self {
Self(x)
}
}
#[derive(Debug, Clone, Deserialize, Serialize, Ord, PartialOrd, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub enum AppMode {
Trial,
Signed,
}
pub async fn run(
node_info: NodeInfo,
store: BanyanStore,
event_store: EventStoreRef,
blobs: BlobStore,
bind_to: Arc<Mutex<SocketAddrHelper>>,
snd: Sender<anyhow::Result<()>>,
swarm_state: Reader<SwarmState>,
) {
let event_service = events::service::EventService::new(event_store, node_info.node_id);
let pinner = FilePinner::new(event_service.clone(), store.ipfs().clone());
let api = routes(node_info, store, event_service, pinner, blobs, swarm_state);
#[allow(clippy::needless_collect)]
let addrs = bind_to.lock().iter().collect::<Vec<_>>();
let tasks = addrs
.into_iter()
.map(|i| {
let (addr, task) = serve_it(i, api.clone().boxed()).map_err(move |e| {
e.context(NodeErrorContext::BindFailed {
addr: to_multiaddr(i),
component: "API".into(),
})
})?;
tracing::info!(target: "API_BOUND", "API bound to {}.", addr);
bind_to.lock().inject_bound_addr(i, addr);
Ok(task)
})
.collect::<anyhow::Result<Vec<_>>>();
let tasks = match tasks {
Ok(t) => t,
Err(e) => {
ax_panic!(e);
}
};
let _ = snd.send(Ok(()));
if let Err(e) = try_join_all(tasks).await {
ax_panic!(e);
}
}
fn routes(
node_info: NodeInfo,
store: BanyanStore,
event_service: EventService,
pinner: FilePinner,
blobs: BlobStore,
swarm_state: Reader<SwarmState>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let events = events::routes(node_info.clone(), event_service);
let node = node::route(node_info.clone(), store.clone(), swarm_state);
let auth = auth::route(node_info.clone());
let files = files::route(store.clone(), node_info.clone(), pinner);
let blob = blob::routes(blobs, node_info.clone());
let api_path = path!("api" / "v2" / ..);
let cors = cors()
.allow_any_origin()
.allow_headers(vec!["accept", "authorization", "content-type"])
.allow_methods(&[http::Method::GET, http::Method::POST, http::Method::PUT]);
let log = warp::log::custom(|info| {
tracing::debug!(
remote_addr=%OptFmt(info.remote_addr()),
method=%info.method(),
path=%info.path(),
version=?info.version(),
status=%info.status().as_u16(),
referer=%OptFmt(info.referer()),
user_agent=%OptFmt(info.user_agent()),
elapsed=?info.elapsed(),
"Processed request"
);
});
balanced_or!(
files::root_serve(store, node_info),
api_path.and(balanced_or!(
path("events").and(events),
path("node").and(node),
path("auth").and(auth),
path("files").and(files),
path("blob").and(blob),
))
)
.recover(|r| async { rejections::handle_rejection(r) })
.with(cors)
.with(log)
}
struct OptFmt<T>(Option<T>);
impl<T: fmt::Display> fmt::Display for OptFmt<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref t) = self.0 {
fmt::Display::fmt(t, f)
} else {
f.write_str("-")
}
}
}