use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::{Future, TryFutureExt};
use futures::try_join;
use log::debug;
use serde::de::DeserializeOwned;
use tc_error::*;
use tcgeneric::{NetworkTime, TCPathBuf};
use crate::http;
use crate::kernel::Kernel;
use crate::scalar::{Link, LinkHost, LinkProtocol, Value};
use crate::state::State;
use crate::txn::*;
pub struct Config {
pub addr: IpAddr,
pub http_port: u16,
pub request_ttl: Duration,
}
#[async_trait]
pub trait Client {
async fn fetch<T: DeserializeOwned>(
&self,
txn_id: &TxnId,
link: &Link,
key: &Value,
) -> TCResult<T>;
async fn get(&self, txn: Txn, link: Link, key: Value) -> TCResult<State>;
async fn put(&self, txn: Txn, link: Link, key: Value, value: State) -> TCResult<()>;
async fn post(&self, txn: Txn, link: Link, params: State) -> TCResult<State>;
async fn delete(&self, txn: &Txn, link: Link, key: Value) -> TCResult<()>;
}
#[async_trait]
pub trait Server {
type Error: std::error::Error;
async fn listen(self, addr: SocketAddr) -> Result<(), Self::Error>;
}
pub struct Gateway {
config: Config,
kernel: Kernel,
txn_server: TxnServer,
root: LinkHost,
client: http::Client,
actor: Actor,
}
impl Gateway {
pub fn time() -> NetworkTime {
NetworkTime::now()
}
pub fn new(config: Config, kernel: Kernel, txn_server: TxnServer) -> Arc<Self> {
let root = LinkHost::from((
LinkProtocol::HTTP,
config.addr.clone(),
Some(config.http_port),
));
Arc::new(Self {
config,
kernel,
txn_server,
root,
client: http::Client::new(),
actor: Actor::new(Link::default().into()),
})
}
pub fn root(&self) -> &LinkHost {
&self.root
}
pub fn link(&self, path: TCPathBuf) -> Link {
Link::from((self.root.clone(), path))
}
pub async fn new_txn(self: &Arc<Self>, txn_id: TxnId, token: Option<String>) -> TCResult<Txn> {
let token = if let Some(token) = token {
use rjwt::Resolve;
Resolver::new(self, &self.root().clone().into(), &txn_id)
.consume_and_sign(&self.actor, vec![], token, txn_id.time().into())
.map_err(TCError::unauthorized)
.await?
} else {
let token = Token::new(
self.root.clone().into(),
txn_id.time().into(),
self.config.request_ttl,
self.actor.id().clone(),
vec![],
);
let signed = self.actor.sign_token(&token).map_err(TCError::internal)?;
let claims = token.claims();
(signed, claims)
};
self.txn_server.new_txn(self.clone(), txn_id, token).await
}
pub async fn fetch<T: DeserializeOwned>(
&self,
txn_id: &TxnId,
link: &Link,
key: &Value,
) -> TCResult<T> {
self.client.fetch(txn_id, link, key).await
}
pub async fn get(&self, txn: &Txn, link: Link, key: Value) -> TCResult<State> {
debug!("GET {}: {}", link, key);
match link.host() {
None if link.path().is_empty() && key.is_none() => {
let public_key = Bytes::from(self.actor.public_key().as_bytes().to_vec());
Ok(State::from(Value::from(public_key)))
}
None => self.kernel.get(txn, link.path(), key).await,
Some(host) if host == self.root() => self.kernel.get(txn, link.path(), key).await,
_ => self.client.get(txn.clone(), link, key).await,
}
}
pub fn put<'a>(
&'a self,
txn: &'a Txn,
link: Link,
key: Value,
value: State,
) -> Pin<Box<dyn Future<Output = TCResult<()>> + Send + 'a>> {
Box::pin(async move {
debug!("PUT {}: {} <- {}", link, key, value);
match link.host() {
None => self.kernel.put(txn, link.path(), key, value).await,
Some(host) if host == self.root() => {
self.kernel.put(txn, link.path(), key, value).await
}
_ => self.client.put(txn.clone(), link, key, value).await,
}
})
}
pub async fn post(&self, txn: &Txn, link: Link, params: State) -> TCResult<State> {
debug!("POST to {} with params {}", link, params);
match link.host() {
None => self.kernel.post(txn, link.path(), params).await,
Some(host) if host == self.root() => self.kernel.post(txn, link.path(), params).await,
_ => self.client.post(txn.clone(), link, params).await,
}
}
pub async fn delete(&self, txn: &Txn, link: Link, key: Value) -> TCResult<()> {
debug!("DELETE {}: {}", link, key);
match link.host() {
None => self.kernel.delete(txn, link.path(), key).await,
Some(host) if host == self.root() => self.kernel.delete(txn, link.path(), key).await,
_ => self.client.delete(txn, link, key).await,
}
}
pub fn listen(
self: Arc<Self>,
) -> Pin<Box<impl Future<Output = Result<(), Box<dyn std::error::Error>>> + 'static>> {
Box::pin(async move {
match try_join!(self.clone().http_listen(), self.clone().replicate()) {
Ok(_) => Ok(()),
Err(cause) => Err(cause),
}
})
}
async fn replicate(self: Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let result = async move {
for cluster in self.kernel.hosted() {
let gateway = self.clone();
if cluster.link().host().is_none() {
continue;
}
log::info!("replicating {}", cluster);
let txn = gateway.new_txn(TxnId::new(Self::time()), None).await?;
let txn = cluster.claim(&txn).await?;
let self_link = txn.link(cluster.link().path().clone());
cluster.add_replica(&txn, self_link).await?;
cluster.distribute_commit(txn).await?;
log::info!("{} is now online", cluster);
}
TCResult::Ok(())
};
match result.await {
Ok(()) => Result::<(), Box<dyn std::error::Error>>::Ok(()),
Err(cause) => {
let e: Box<dyn std::error::Error> = Box::new(cause);
Err(e)
}
}
}
fn http_listen(
self: Arc<Self>,
) -> std::pin::Pin<Box<impl futures::Future<Output = Result<(), Box<dyn std::error::Error>>>>>
{
let http_addr = (self.config.addr, self.config.http_port).into();
let server = crate::http::HTTPServer::new(self);
let listener = server.listen(http_addr).map_err(|e| {
let e: Box<dyn std::error::Error> = Box::new(e);
e
});
Box::pin(listener)
}
}