use crate::consensus::{self, Authority};
use crate::debug::stats;
use crate::table::{Table, TableBuilder, TableRpc};
use crate::view::{View, ViewBuilder, ViewRpc};
use crate::ActivationResult;
use failure::{self, ResultExt};
use futures_util::future;
use petgraph::graph::NodeIndex;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{
future::Future,
task::{Context, Poll},
};
use tower_buffer::Buffer;
use tower_service::Service;
#[derive(Clone, Serialize, Deserialize)]
#[doc(hidden)]
pub struct ControllerDescriptor {
pub external_addr: SocketAddr,
pub worker_addr: SocketAddr,
pub domain_addr: SocketAddr,
pub nonce: u64,
}
struct Controller<A> {
authority: Arc<A>,
client: hyper::Client<hyper::client::HttpConnector>,
}
#[derive(Debug)]
struct ControllerRequest {
path: &'static str,
request: Vec<u8>,
}
impl ControllerRequest {
fn new<Q: Serialize>(path: &'static str, r: Q) -> Result<Self, serde_json::Error> {
Ok(ControllerRequest {
path,
request: serde_json::to_vec(&r)?,
})
}
}
impl<A> Service<ControllerRequest> for Controller<A>
where
A: 'static + Authority,
{
type Response = hyper::body::Bytes;
type Error = failure::Error;
#[cfg(not(doc))]
type Future = impl Future<Output = Result<Self::Response, Self::Error>> + Send;
#[cfg(doc)]
type Future = crate::doc_mock::Future<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ControllerRequest) -> Self::Future {
let client = self.client.clone();
let auth = self.authority.clone();
let path = req.path;
let body = req.request;
async move {
let mut url = None;
loop {
if url.is_none() {
let descriptor: ControllerDescriptor = serde_json::from_slice(
&auth.get_leader().context("failed to get current leader")?.1,
)
.context("failed to deserialize authority reply")?;
url = Some(format!("http://{}/{}", descriptor.external_addr, path));
}
let r = hyper::Request::post(url.as_ref().unwrap())
.body(hyper::Body::from(body.clone()))
.unwrap();
let res = client
.request(r)
.await
.map_err(|he| failure::Error::from(he).context("hyper request failed"))?;
let status = res.status();
let body = hyper::body::to_bytes(res.into_body())
.await
.map_err(|he| failure::Error::from(he).context("hyper response failed"))?;
match status {
hyper::StatusCode::OK => return Ok(body),
hyper::StatusCode::INTERNAL_SERVER_ERROR => bail!(
"rpc call to {} failed: {}",
path,
String::from_utf8_lossy(&*body)
),
s => {
if s == hyper::StatusCode::SERVICE_UNAVAILABLE {
url = None;
}
tokio::time::delay_for(Duration::from_millis(100)).await;
}
}
}
}
}
}
pub struct ControllerHandle<A>
where
A: 'static + Authority,
{
handle: Buffer<Controller<A>, ControllerRequest>,
domains: Arc<Mutex<HashMap<(SocketAddr, usize), TableRpc>>>,
views: Arc<Mutex<HashMap<(SocketAddr, usize), ViewRpc>>>,
tracer: tracing::Dispatch,
}
impl<A> Clone for ControllerHandle<A>
where
A: 'static + Authority,
{
fn clone(&self) -> Self {
ControllerHandle {
handle: self.handle.clone(),
domains: self.domains.clone(),
views: self.views.clone(),
tracer: self.tracer.clone(),
}
}
}
impl ControllerHandle<consensus::ZookeeperAuthority> {
pub async fn from_zk(zookeeper_address: &str) -> Result<Self, failure::Error> {
let auth = consensus::ZookeeperAuthority::new(zookeeper_address)?;
ControllerHandle::new(auth).await
}
}
#[cfg(not(doc))]
type RpcFuture<A, R> = impl Future<Output = Result<R, failure::Error>>;
#[cfg(doc)]
type RpcFuture<A, R> = crate::doc_mock::FutureWithExtra<Result<R, failure::Error>, A>;
async fn finalize<R, E>(
fut: impl Future<Output = Result<hyper::body::Bytes, E>>,
err: &'static str,
) -> Result<R, failure::Error>
where
for<'de> R: Deserialize<'de>,
E: std::fmt::Display + Send + Sync + 'static,
{
let body: hyper::body::Bytes = fut.await.map_err(failure::Context::new).context(err)?;
serde_json::from_slice::<R>(&body)
.context("failed to response")
.context(err)
.map_err(failure::Error::from)
}
impl<A: Authority + 'static> ControllerHandle<A> {
#[doc(hidden)]
pub async fn make(authority: Arc<A>) -> Result<Self, failure::Error> {
let tracer = tracing::dispatcher::get_default(|d| d.clone());
Ok(ControllerHandle {
views: Default::default(),
domains: Default::default(),
handle: Buffer::new(
Controller {
authority,
client: hyper::Client::new(),
},
1,
),
tracer,
})
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), failure::Error>> {
self.handle
.poll_ready(cx)
.map_err(failure::Error::from_boxed_compat)
}
pub async fn ready(&mut self) -> Result<(), failure::Error> {
future::poll_fn(move |cx| self.poll_ready(cx)).await
}
pub async fn new(authority: A) -> Result<Self, failure::Error>
where
A: Send + 'static,
{
Self::make(Arc::new(authority)).await
}
pub fn inputs(
&mut self,
) -> impl Future<Output = Result<BTreeMap<String, NodeIndex>, failure::Error>> {
let fut = self
.handle
.call(ControllerRequest::new("inputs", &()).unwrap());
async move {
let body: hyper::body::Bytes = fut
.await
.map_err(failure::Context::new)
.context("failed to fetch inputs")?;
serde_json::from_slice(&body)
.context("couldn't parse input response")
.map_err(failure::Error::from)
}
}
pub fn outputs(
&mut self,
) -> impl Future<Output = Result<BTreeMap<String, NodeIndex>, failure::Error>> {
let fut = self
.handle
.call(ControllerRequest::new("outputs", &()).unwrap());
async move {
let body: hyper::body::Bytes = fut
.await
.map_err(failure::Context::new)
.context("failed to fetch outputs")?;
serde_json::from_slice(&body)
.context("couldn't parse output response")
.map_err(failure::Error::from)
}
}
pub fn view(&mut self, name: &str) -> impl Future<Output = Result<View, failure::Error>> {
#[cfg(debug_assertions)]
assert_infrequent::at_most(200);
let views = self.views.clone();
let name = name.to_string();
let fut = self
.handle
.call(ControllerRequest::new("view_builder", &name).unwrap());
async move {
let body: hyper::body::Bytes = fut
.await
.map_err(failure::Context::new)
.context("failed to fetch view builder")?;
match serde_json::from_slice::<Option<ViewBuilder>>(&body) {
Ok(Some(vb)) => Ok(vb.build(views)?),
Ok(None) => Err(failure::err_msg("view does not exist")),
Err(e) => Err(failure::Error::from(e)),
}
.map_err(move |e| e.context(format!("building view for {}", name)).into())
}
}
pub fn table(&mut self, name: &str) -> impl Future<Output = Result<Table, failure::Error>> {
#[cfg(debug_assertions)]
assert_infrequent::at_most(200);
let domains = self.domains.clone();
let name = name.to_string();
let fut = self
.handle
.call(ControllerRequest::new("table_builder", &name).unwrap());
async move {
let body: hyper::body::Bytes = fut
.await
.map_err(failure::Context::new)
.context("failed to fetch table builder")?;
match serde_json::from_slice::<Option<TableBuilder>>(&body) {
Ok(Some(tb)) => Ok(tb.build(domains)?),
Ok(None) => Err(failure::err_msg("view table not exist")),
Err(e) => Err(failure::Error::from(e)),
}
.map_err(move |e| e.context(format!("building table for {}", name)).into())
}
}
#[doc(hidden)]
pub fn rpc<Q: Serialize, R: 'static>(
&mut self,
path: &'static str,
r: Q,
err: &'static str,
) -> RpcFuture<A, R>
where
for<'de> R: Deserialize<'de>,
R: Send,
{
let fut = self.handle.call(ControllerRequest::new(path, r).unwrap());
finalize(fut, err)
}
pub fn statistics(
&mut self,
) -> impl Future<Output = Result<stats::GraphStats, failure::Error>> {
self.rpc("get_statistics", (), "failed to get stats")
}
pub fn flush_partial(&mut self) -> impl Future<Output = Result<(), failure::Error>> {
self.rpc("flush_partial", (), "failed to flush partial")
}
pub fn extend_recipe(
&mut self,
recipe_addition: &str,
) -> impl Future<Output = Result<ActivationResult, failure::Error>> {
self.rpc("extend_recipe", recipe_addition, "failed to extend recipe")
}
pub fn install_recipe(
&mut self,
new_recipe: &str,
) -> impl Future<Output = Result<ActivationResult, failure::Error>> {
self.rpc("install_recipe", new_recipe, "failed to install recipe")
}
pub fn graphviz(&mut self) -> impl Future<Output = Result<String, failure::Error>> {
self.rpc("graphviz", (), "failed to fetch graphviz output")
}
pub fn simple_graphviz(&mut self) -> impl Future<Output = Result<String, failure::Error>> {
self.rpc(
"simple_graphviz",
(),
"failed to fetch simple graphviz output",
)
}
pub fn remove_node(
&mut self,
view: NodeIndex,
) -> impl Future<Output = Result<(), failure::Error>> {
self.rpc("remove_node", view, "failed to remove node")
}
}