use crate::data::*;
use crate::{Tagged, Tagger};
use async_bincode::{AsyncBincodeStream, AsyncDestination};
use futures_util::{
future, future::TryFutureExt, ready, stream::futures_unordered::FuturesUnordered,
stream::StreamExt, stream::TryStreamExt,
};
use nom_sql::ColumnSpecification;
use petgraph::graph::NodeIndex;
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio_tower::multiplex;
use tower_balance::p2c::Balance;
use tower_buffer::Buffer;
use tower_discover::ServiceStream;
use tower_limit::concurrency::ConcurrencyLimit;
use tower_service::Service;
type Transport = AsyncBincodeStream<
tokio::net::TcpStream,
Tagged<ReadReply>,
Tagged<ReadQuery>,
AsyncDestination,
>;
#[derive(Debug)]
struct Endpoint(SocketAddr);
type InnerService = multiplex::Client<
multiplex::MultiplexTransport<Transport, Tagger>,
tokio_tower::Error<multiplex::MultiplexTransport<Transport, Tagger>, Tagged<ReadQuery>>,
Tagged<ReadQuery>,
>;
impl Service<()> for Endpoint {
type Response = InnerService;
type Error = tokio::io::Error;
#[cfg(not(doc))]
type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
#[cfg(doc)]
type Future = crate::doc_mock::Future<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
let f = tokio::net::TcpStream::connect(self.0);
async move {
let s = f.await?;
s.set_nodelay(true)?;
let s = AsyncBincodeStream::from(s).for_async();
let t = multiplex::MultiplexTransport::new(s, Tagger::default());
Ok(multiplex::Client::with_error_handler(t, |e| {
eprintln!("view server went away: {}", e)
}))
}
}
}
fn make_views_stream(
addr: SocketAddr,
) -> impl futures_util::stream::TryStream<
Ok = tower_discover::Change<usize, InnerService>,
Error = tokio::io::Error,
> {
(0..crate::VIEW_POOL_SIZE)
.map(|i| async move {
let svc = Endpoint(addr).call(()).await?;
Ok(tower_discover::Change::Insert(i, svc))
})
.collect::<futures_util::stream::FuturesUnordered<_>>()
}
fn make_views_discover(addr: SocketAddr) -> Discover {
ServiceStream::new(make_views_stream(addr))
}
#[cfg(not(doc))]
type Discover = impl tower_discover::Discover<Key = usize, Service = InnerService, Error = tokio::io::Error>
+ Unpin
+ Send;
#[cfg(doc)]
type Discover = crate::doc_mock::Discover<InnerService>;
pub(crate) type ViewRpc =
Buffer<ConcurrencyLimit<Balance<Discover, Tagged<ReadQuery>>>, Tagged<ReadQuery>>;
#[derive(Debug, Fail)]
pub enum ViewError {
#[fail(display = "the view is not yet available")]
NotYetAvailable,
#[fail(display = "{}", _0)]
TransportError(#[cause] failure::Error),
}
impl From<Box<dyn std::error::Error + Send + Sync>> for ViewError {
fn from(e: Box<dyn std::error::Error + Send + Sync>) -> Self {
ViewError::TransportError(failure::Error::from_boxed_compat(e))
}
}
#[doc(hidden)]
#[derive(Serialize, Deserialize, Debug)]
pub enum ReadQuery {
Normal {
target: (NodeIndex, usize),
keys: Vec<Vec<DataType>>,
block: bool,
},
Size {
target: (NodeIndex, usize),
},
}
#[doc(hidden)]
#[derive(Serialize, Deserialize, Debug)]
pub enum ReadReply {
Normal(Result<Vec<Vec<Vec<DataType>>>, ()>),
Size(usize),
}
#[doc(hidden)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ViewBuilder {
pub node: NodeIndex,
pub columns: Vec<String>,
pub schema: Option<Vec<ColumnSpecification>>,
pub shards: Vec<SocketAddr>,
}
impl ViewBuilder {
#[doc(hidden)]
pub fn build(
&self,
rpcs: Arc<Mutex<HashMap<(SocketAddr, usize), ViewRpc>>>,
) -> Result<View, io::Error> {
let node = self.node;
let columns = self.columns.clone();
let shards = self.shards.clone();
let schema = self.schema.clone();
let mut addrs = Vec::with_capacity(shards.len());
let mut conns = Vec::with_capacity(shards.len());
for (shardi, &addr) in shards.iter().enumerate() {
use std::collections::hash_map::Entry;
addrs.push(addr);
let mut rpcs = rpcs.lock().unwrap();
let s = match rpcs.entry((addr, shardi)) {
Entry::Occupied(e) => e.get().clone(),
Entry::Vacant(h) => {
let (c, w) = Buffer::pair(
ConcurrencyLimit::new(
Balance::from_entropy(make_views_discover(addr)),
crate::PENDING_LIMIT,
),
crate::BUFFER_TO_POOL,
);
use tracing_futures::Instrument;
tokio::spawn(w.instrument(tracing::debug_span!(
"view_worker",
addr = %addr,
shard = shardi
)));
h.insert(c.clone());
c
}
};
conns.push(s);
}
let tracer = tracing::dispatcher::get_default(|d| d.clone());
Ok(View {
node,
schema,
columns: Arc::from(columns),
shard_addrs: addrs,
shards: conns,
tracer,
})
}
}
#[derive(Clone)]
pub struct View {
node: NodeIndex,
columns: Arc<[String]>,
schema: Option<Vec<ColumnSpecification>>,
shards: Vec<ViewRpc>,
shard_addrs: Vec<SocketAddr>,
tracer: tracing::Dispatch,
}
impl fmt::Debug for View {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("View")
.field("node", &self.node)
.field("columns", &self.columns)
.field("shard_addrs", &self.shard_addrs)
.finish()
}
}
pub(crate) mod results;
use self::results::{Results, Row};
impl Service<(Vec<Vec<DataType>>, bool)> for View {
type Response = Vec<Results>;
type Error = ViewError;
#[cfg(not(doc))]
type Future = impl Future<Output = Result<Self::Response, Self::Error>> + Send;
#[cfg(doc)]
type Future = crate::doc_mock::Future<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
for s in &mut self.shards {
ready!(s.poll_ready(cx)).map_err(ViewError::from)?;
}
Poll::Ready(Ok(()))
}
fn call(&mut self, (keys, block): (Vec<Vec<DataType>>, bool)) -> Self::Future {
let span = if crate::trace_next_op() {
Some(tracing::trace_span!(
"view-request",
?keys,
node = self.node.index()
))
} else {
None
};
let columns = Arc::clone(&self.columns);
if self.shards.len() == 1 {
let request = Tagged::from(ReadQuery::Normal {
target: (self.node, 0),
keys,
block,
});
let _guard = span.as_ref().map(tracing::Span::enter);
tracing::trace!("submit request");
return future::Either::Left(
self.shards[0]
.call(request)
.map_err(ViewError::from)
.and_then(move |reply| async move {
match reply.v {
ReadReply::Normal(Ok(rows)) => Ok(rows
.into_iter()
.map(|rows| Results::new(rows, Arc::clone(&columns)))
.collect()),
ReadReply::Normal(Err(())) => Err(ViewError::NotYetAvailable),
_ => unreachable!(),
}
}),
);
}
if let Some(ref span) = span {
span.in_scope(|| tracing::trace!("shard request"));
}
assert!(keys.iter().all(|k| k.len() == 1));
let mut shard_queries = vec![Vec::new(); self.shards.len()];
for key in keys {
let shard = crate::shard_by(&key[0], self.shards.len());
shard_queries[shard].push(key);
}
let node = self.node;
future::Either::Right(
self.shards
.iter_mut()
.enumerate()
.zip(shard_queries.into_iter())
.filter_map(|((shardi, shard), shard_queries)| {
if shard_queries.is_empty() {
*shard = shard.clone();
None
} else {
Some(((shardi, shard), shard_queries))
}
})
.map(move |((shardi, shard), shard_queries)| {
let request = Tagged::from(ReadQuery::Normal {
target: (node, shardi),
keys: shard_queries,
block,
});
let _guard = span.as_ref().map(tracing::Span::enter);
let span = if span.is_some() {
Some(tracing::trace_span!("view-shard", shardi))
} else {
None
};
let _guard = span.as_ref().map(tracing::Span::enter);
tracing::trace!("submit request shard");
shard
.call(request)
.map_err(ViewError::from)
.and_then(|reply| async move {
match reply.v {
ReadReply::Normal(Ok(rows)) => Ok(rows),
ReadReply::Normal(Err(())) => Err(ViewError::NotYetAvailable),
_ => unreachable!(),
}
})
})
.collect::<FuturesUnordered<_>>()
.try_concat()
.map_ok(move |rows| {
rows.into_iter()
.map(|rows| Results::new(rows, Arc::clone(&columns)))
.collect()
}),
)
}
}
#[allow(clippy::len_without_is_empty)]
impl View {
pub fn columns(&self) -> &[String] {
&*self.columns
}
pub fn schema(&self) -> Option<&[ColumnSpecification]> {
self.schema.as_deref()
}
pub async fn len(&mut self) -> Result<usize, ViewError> {
future::poll_fn(|cx| self.poll_ready(cx)).await?;
let node = self.node;
let mut rsps = self
.shards
.iter_mut()
.enumerate()
.map(|(shardi, shard)| {
shard.call(Tagged::from(ReadQuery::Size {
target: (node, shardi),
}))
})
.collect::<FuturesUnordered<_>>();
let mut nrows = 0;
while let Some(reply) = rsps.next().await.transpose()? {
if let ReadReply::Size(rows) = reply.v {
nrows += rows;
} else {
unreachable!();
}
}
Ok(nrows)
}
pub async fn multi_lookup(
&mut self,
keys: Vec<Vec<DataType>>,
block: bool,
) -> Result<Vec<Results>, ViewError> {
future::poll_fn(|cx| self.poll_ready(cx)).await?;
self.call((keys, block)).await
}
pub async fn lookup(&mut self, key: &[DataType], block: bool) -> Result<Results, ViewError> {
let rs = self.multi_lookup(vec![Vec::from(key)], block).await?;
Ok(rs.into_iter().next().unwrap())
}
pub async fn lookup_first(
&mut self,
key: &[DataType],
block: bool,
) -> Result<Option<Row>, ViewError> {
let rs = self.multi_lookup(vec![Vec::from(key)], block).await?;
Ok(rs.into_iter().next().unwrap().into_iter().next())
}
}