use crate::channel::client;
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
use netidx::{
pack::Pack,
path::Path,
subscriber::{Subscriber, Value},
};
use parking_lot::Mutex;
use std::mem;
use tokio::sync::Mutex as AsyncMutex;
pub struct Batch {
data: BytesMut,
}
impl Batch {
pub fn queue<S: Pack>(&mut self, t: &S) -> Result<()> {
Ok(Pack::encode(t, &mut self.data)?)
}
pub fn len(&self) -> usize {
self.data.len()
}
}
pub struct Connection {
inner: client::Connection,
buf: Mutex<BytesMut>,
queue: AsyncMutex<Bytes>,
}
impl Connection {
pub async fn connect(subscriber: &Subscriber, path: Path) -> Result<Connection> {
let inner = client::Connection::connect(subscriber, path).await?;
Ok(Connection {
inner,
buf: Mutex::new(BytesMut::new()),
queue: AsyncMutex::new(Bytes::new()),
})
}
pub fn is_dead(&self) -> bool {
self.inner.is_dead()
}
pub fn start_batch(&self) -> Batch {
Batch { data: mem::replace(&mut *self.buf.lock(), BytesMut::new()) }
}
pub fn send(&self, mut batch: Batch) -> Result<()> {
let v = Value::from(batch.data.split().freeze());
self.inner.send(v)?;
Ok(*self.buf.lock() = batch.data)
}
pub fn send_one<S: Pack>(&self, t: &S) -> Result<()> {
let mut b = self.start_batch();
b.queue(t)?;
self.send(b)
}
pub fn dirty(&self) -> bool {
self.inner.dirty()
}
pub async fn flush(&self) -> Result<()> {
self.inner.flush().await
}
async fn try_fill_queue(&self, queue: &mut Bytes) -> Result<()> {
if !queue.has_remaining() {
match self.inner.try_recv_one().await? {
Some(Value::Bytes(buf)) => *queue = (*buf).clone(),
Some(v) => bail!("unexpected response {}", v),
None => (),
}
}
Ok(())
}
async fn fill_queue(&self, queue: &mut Bytes) -> Result<()> {
if !queue.has_remaining() {
match self.inner.recv_one().await? {
Value::Bytes(buf) => *queue = (*buf).clone(),
v => bail!("unexpected response {}", v),
}
}
Ok(())
}
pub async fn recv<R: Pack + 'static, F: FnMut(R) -> bool>(
&self,
mut f: F,
) -> Result<()> {
let mut queue = self.queue.lock().await;
self.fill_queue(&mut *queue).await?;
loop {
if queue.has_remaining() {
if !f(Pack::decode(&mut *queue)?) {
break;
}
} else {
self.try_fill_queue(&mut *queue).await?;
if !queue.has_remaining() {
break;
}
}
}
Ok(())
}
pub async fn try_recv<R: Pack + 'static, F: FnMut(R) -> bool>(
&self,
mut f: F,
) -> Result<()> {
let mut queue = self.queue.lock().await;
self.try_fill_queue(&mut *queue).await?;
loop {
if queue.has_remaining() {
if !f(Pack::decode(&mut *queue)?) {
break;
}
} else {
self.try_fill_queue(&mut *queue).await?;
if !queue.has_remaining() {
break;
}
}
}
Ok(())
}
pub async fn recv_one<R: Pack + 'static>(&self) -> Result<R> {
let mut queue = self.queue.lock().await;
self.fill_queue(&mut *queue).await?;
Ok(<R as Pack>::decode(&mut *queue)?)
}
pub async fn try_recv_one<R: Pack + 'static>(&self) -> Result<Option<R>> {
let mut queue = self.queue.lock().await;
self.try_fill_queue(&mut *queue).await?;
if queue.remaining() > 0 {
Ok(Some(Pack::decode(&mut *queue)?))
} else {
Ok(None)
}
}
}