use anyhow::{anyhow, Result};
use futures::{channel::mpsc, prelude::*};
use netidx::{
path::Path,
subscriber::{Event, SubId, Subscriber, UpdatesFlags, Val, Value},
};
use poolshark::global::GPooled;
use std::{
collections::VecDeque,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tokio::{sync::Mutex, time};
struct Receiver {
updates: mpsc::Receiver<GPooled<Vec<(SubId, Event)>>>,
queued: VecDeque<Value>,
}
impl Receiver {
fn fill_from_channel(
&mut self,
dead: &AtomicBool,
r: Option<GPooled<Vec<(SubId, Event)>>>,
) -> Result<()> {
match r {
None => {
dead.store(true, Ordering::Relaxed);
bail!("connection is dead")
}
Some(mut batch) => {
for (_, ev) in batch.drain(..) {
match ev {
Event::Update(v) => self.queued.push_back(v),
Event::Unsubscribed => dead.store(true, Ordering::Relaxed),
}
}
}
}
Ok(())
}
async fn fill_queue(&mut self, dead: &AtomicBool) -> Result<()> {
self.try_fill_queue(dead)?;
if self.queued.len() == 0 {
if dead.load(Ordering::Relaxed) {
bail!("connection is dead")
}
let r = self.updates.next().await;
self.fill_from_channel(dead, r)?
}
Ok(())
}
fn try_fill_queue(&mut self, dead: &AtomicBool) -> Result<()> {
for _ in 0..10 {
match self.updates.try_recv() {
Err(_) => break,
Ok(r) => {
if let Err(e) = self.fill_from_channel(dead, Some(r)) {
if self.queued.len() == 0 {
return Err(e);
} else {
break;
}
}
}
}
}
Ok(())
}
}
pub struct Connection {
_subscriber: Subscriber,
con: Val,
receiver: Mutex<Receiver>,
dead: AtomicBool,
dirty: AtomicBool,
}
impl Connection {
async fn connect_singleton(subscriber: &Subscriber, path: Path) -> Result<Self> {
let to = Duration::from_secs(15);
let (tx, rx) = mpsc::channel(5);
let mut n = 0;
let f = UpdatesFlags::empty();
let con = loop {
let tx = tx.clone();
if n > 3 {
break subscriber
.subscribe_nondurable_one_updates(path.clone(), [(f, tx)], Some(to))
.await?;
} else {
match subscriber
.subscribe_nondurable_one_updates(path.clone(), [(f, tx)], Some(to))
.await
{
Ok(con) => break con,
Err(_) => {
n += 1;
time::sleep(Duration::from_millis(250)).await;
continue;
}
}
}
};
let con = Connection {
_subscriber: subscriber.clone(),
con,
dead: AtomicBool::new(false),
dirty: AtomicBool::new(false),
receiver: Mutex::new(Receiver { updates: rx, queued: VecDeque::new() }),
};
con.send(Value::from("ready"))?;
match time::timeout(Duration::from_secs(15), con.recv_one()).await {
Err(_) => bail!("timeout waiting for channel handshake"),
Ok(Err(e)) => return Err(e),
Ok(Ok(Value::String(s))) if &*s == "connection" => (),
Ok(Ok(Value::String(s))) if &*s == "ready" => {
con.send(Value::from("go"))?;
return Ok(con);
}
Ok(Ok(v)) => bail!("channel handshake, expected \"connection\" got {}", v),
}
match time::timeout(Duration::from_secs(15), con.recv_one()).await {
Err(_) => bail!("timeout waiting for channel handshake"),
Ok(Err(e)) => return Err(e),
Ok(Ok(Value::String(s))) if &*s == "ready" => con.send(Value::from("go"))?,
Ok(Ok(v)) => bail!("channel handshake, expected \"ready\" got {}", v),
}
Ok(con)
}
pub async fn connect(subscriber: &Subscriber, path: Path) -> Result<Self> {
let to = Duration::from_secs(15);
let acceptor = subscriber.subscribe(path.clone());
time::timeout(to, acceptor.wait_subscribed()).await??;
match acceptor.last() {
Event::Unsubscribed => bail!("connect failed, unsubscribed after connect"),
Event::Update(Value::String(s)) if &*s == "connection" => {
Self::connect_singleton(subscriber, path).await
}
Event::Update(Value::String(s)) if &*s == "channel" => {
let f = acceptor.write_with_recipt(Value::from("connect"));
match time::timeout(to, f).await? {
Err(_) => bail!("connect failed, timed out"),
Ok(v @ Value::String(_)) => {
let path = v.cast_to::<Path>()?;
Self::connect_singleton(subscriber, path).await
}
Ok(_) => bail!("unexpected response from publisher"),
}
}
Event::Update(_) => bail!("not a channel or connection"),
}
}
pub fn is_dead(&self) -> bool {
self.dead.load(Ordering::Relaxed)
}
fn check_dead(&self) -> Result<()> {
Ok(if self.is_dead() {
bail!("connection is dead")
})
}
pub fn send(&self, v: Value) -> Result<()> {
self.check_dead()?;
self.dirty.store(true, Ordering::Relaxed);
Ok(self.con.write(v))
}
pub fn dirty(&self) -> bool {
self.dirty.load(Ordering::Relaxed)
}
pub async fn flush(&self) -> Result<()> {
self.check_dead()?;
let r = self.con.flush().await.map_err(|_| {
self.dead.store(true, Ordering::Relaxed);
anyhow!("connection is dead")
});
self.dirty.store(false, Ordering::Relaxed);
r
}
pub async fn recv_one(&self) -> Result<Value> {
let mut recv = self.receiver.lock().await;
loop {
match recv.queued.pop_front() {
Some(v) => break Ok(v),
None => {
self.check_dead()?;
recv.fill_queue(&self.dead).await?
}
}
}
}
pub async fn try_recv_one(&self) -> Result<Option<Value>> {
let mut recv = self.receiver.lock().await;
if recv.queued.len() == 0 {
recv.try_fill_queue(&self.dead)?
}
Ok(recv.queued.pop_front())
}
pub async fn recv(&self, dst: &mut impl Extend<Value>) -> Result<()> {
let mut recv = self.receiver.lock().await;
recv.try_fill_queue(&self.dead)?;
loop {
if recv.queued.len() > 0 {
break Ok(dst.extend(recv.queued.drain(..)));
} else {
self.check_dead()?;
recv.fill_queue(&self.dead).await?
}
}
}
pub async fn try_recv(&self, dst: &mut impl Extend<Value>) -> Result<bool> {
let mut recv = self.receiver.lock().await;
recv.try_fill_queue(&self.dead)?;
if recv.queued.len() > 0 {
dst.extend(recv.queued.drain(..));
Ok(true)
} else {
Ok(false)
}
}
}