mod server;
pub use crate::protocol::{
publisher::Id,
value::{FromValue, Typ, Value},
};
pub use crate::resolver_client::DesiredAuth;
use crate::{
config::Config,
path::Path,
protocol::{publisher, resolver::UserInfo},
resolver_client::ResolverWrite,
resolver_server::auth::Permissions,
tls,
utils::{self, ChanId, ChanWrap},
};
use anyhow::{anyhow, Error, Result};
use futures::{
channel::{
mpsc::{unbounded, Sender, UnboundedReceiver, UnboundedSender},
oneshot,
},
prelude::*,
stream::FusedStream,
};
use fxhash::{FxHashMap, FxHashSet};
use if_addrs::get_if_addrs;
use log::{info, warn};
use netidx_netproto::resolver::PublisherPriority;
use parking_lot::Mutex;
use poolshark::global::{GPooled, Pool};
use rand::{self, RngExt};
use std::{
boxed::Box,
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
convert::{From, Into, TryInto},
default::Default,
fmt, iter, mem,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
pin::Pin,
result,
str::FromStr,
sync::{Arc, LazyLock, Weak},
time::Duration,
};
use tokio::{net::TcpListener, task};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BindCfg {
Local,
Match { addr: IpAddr, netmask: IpAddr },
Elastic { public: IpAddr, private: IpAddr, netmask: IpAddr },
ElasticExact { public: SocketAddr, private: SocketAddr },
Exact(SocketAddr),
}
impl Default for BindCfg {
fn default() -> Self {
BindCfg::Local
}
}
impl FromStr for BindCfg {
type Err = Error;
fn from_str(s: &str) -> result::Result<Self, Self::Err> {
match Self::parse_str(s) {
Ok(t) => Ok(t),
Err(e) => bail!("failed to parse '{}', {}", s, e),
}
}
}
impl BindCfg {
fn parse_str(s: &str) -> Result<Self> {
fn addr_and_netmask(s: &str) -> Result<(IpAddr, IpAddr)> {
let mut parts = s.splitn(2, '/');
let addr: IpAddr =
parts.next().ok_or_else(|| anyhow!("expected ip"))?.parse()?;
let bits: u32 =
parts.next().ok_or_else(|| anyhow!("expected netmask"))?.parse()?;
if parts.next().is_some() {
bail!("parse error, trailing garbage after netmask")
}
let netmask = match addr {
IpAddr::V4(_) => {
if bits > 32 {
bail!("invalid netmask");
}
IpAddr::V4(Ipv4Addr::from(u32::MAX.wrapping_shl(32 - bits)))
}
IpAddr::V6(_) => {
if bits > 128 {
bail!("invalid netmask");
}
IpAddr::V6(Ipv6Addr::from(u128::MAX.wrapping_shl(128 - bits)))
}
};
Ok((addr, netmask))
}
fn parse_elastic(s: &str) -> Result<BindCfg> {
let mut parts = s.splitn(2, '@');
let public = parts.next().ok_or_else(|| anyhow!("expected a public ip"))?;
match public.parse::<SocketAddr>() {
Ok(public) => {
let private = parts
.next()
.ok_or_else(|| anyhow!("expected private ip:port"))?
.parse::<SocketAddr>()?;
Ok(BindCfg::ElasticExact { public, private })
}
Err(_) => {
let public = public.parse::<IpAddr>()?;
let (private, netmask) = addr_and_netmask(
parts
.next()
.ok_or_else(|| anyhow!("expected private addr/netmask"))?,
)?;
Ok(BindCfg::Elastic { public, private, netmask })
}
}
}
if s.trim() == "local" {
Ok(BindCfg::Local)
} else {
match s.find("/") {
None => match s.find("@") {
None => Ok(BindCfg::Exact(s.parse()?)),
Some(_) => parse_elastic(s),
},
Some(_) => match s.find("@") {
None => {
let (addr, netmask) = addr_and_netmask(s)?;
Ok(BindCfg::Match { addr, netmask })
}
Some(_) => parse_elastic(s),
},
}
}
}
fn select_local_ip(&self, addr: &IpAddr, netmask: &IpAddr) -> Result<IpAddr> {
match addr {
IpAddr::V4(a) => {
if a.is_unspecified() {
return Ok(*addr);
}
}
IpAddr::V6(a) => {
if a.is_unspecified() {
return Ok(*addr);
}
}
}
let selected = get_if_addrs()?
.iter()
.filter_map(|i| match (i.ip(), addr, netmask) {
(IpAddr::V4(ip), IpAddr::V4(addr), IpAddr::V4(nm)) => {
let masked = Ipv4Addr::from(
u32::from_be_bytes(ip.octets()) & u32::from_be_bytes(nm.octets()),
);
if &masked == addr {
Some(IpAddr::V4(ip))
} else {
None
}
}
(IpAddr::V6(ip), IpAddr::V6(addr), IpAddr::V6(nm)) => {
let masked = Ipv6Addr::from(
u128::from_be_bytes(ip.octets())
& u128::from_be_bytes(nm.octets()),
);
if &masked == addr {
Some(IpAddr::V6(ip))
} else {
None
}
}
(_, _, _) => None,
})
.collect::<Vec<_>>();
if selected.len() == 1 {
Ok(selected[0])
} else if selected.len() == 0 {
bail!("no interface matches {:?}", self);
} else {
bail!("ambigous specification {:?} matches {:?}", self, selected);
}
}
fn select(&self) -> Result<(IpAddr, IpAddr)> {
match self {
BindCfg::Local => {
Ok((IpAddr::V4(Ipv4Addr::LOCALHOST), IpAddr::V4(Ipv4Addr::LOCALHOST)))
}
BindCfg::Exact(addr) => {
if get_if_addrs()?.iter().any(|i| i.ip() == addr.ip()) {
Ok((addr.ip(), addr.ip()))
} else {
bail!("no interface matches the bind address {:?}", addr);
}
}
BindCfg::Elastic { public, private, netmask } => {
let private = self.select_local_ip(private, netmask)?;
Ok((*public, private))
}
BindCfg::ElasticExact { public, private } => Ok((public.ip(), private.ip())),
BindCfg::Match { addr, netmask } => {
let private = self.select_local_ip(addr, netmask)?;
Ok((private, private))
}
}
}
}
atomic_id!(ClId);
static BATCHES: LazyLock<Pool<Vec<WriteRequest>>> =
LazyLock::new(|| Pool::new(100, 10_000));
static TOPUB: LazyLock<Pool<HashMap<Path, Option<u32>>>> =
LazyLock::new(|| Pool::new(10, 10_000));
static TOUPUB: LazyLock<Pool<HashSet<Path>>> = LazyLock::new(|| Pool::new(5, 10_000));
static TOUSUB: LazyLock<Pool<HashMap<Id, Subscribed>>> =
LazyLock::new(|| Pool::new(5, 10_000));
static RAWBATCH: LazyLock<Pool<Vec<BatchMsg>>> = LazyLock::new(|| Pool::new(100, 10_000));
static UPDATES: LazyLock<Pool<Vec<publisher::From>>> =
LazyLock::new(|| Pool::new(100, 10_000));
static RAWUNSUBS: LazyLock<Pool<Vec<(ClId, Id)>>> =
LazyLock::new(|| Pool::new(100, 10_000));
static UNSUBS: LazyLock<Pool<Vec<Id>>> = LazyLock::new(|| Pool::new(100, 10_000));
static BATCH: LazyLock<Pool<FxHashMap<ClId, Update>>> =
LazyLock::new(|| Pool::new(100, 1000));
static PUBLISHERS: LazyLock<Mutex<Vec<PublisherWeak>>> =
LazyLock::new(|| Mutex::new(Vec::new()));
bitflags! {
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct PublishFlags: u32 {
const USE_EXISTING = 0x01;
const DESTROY_ON_IDLE = 0x02;
const ISOLATED = 0x04;
const PREFER_LOCAL = 0x08;
const FORCE_LOCAL = 0x10;
}
}
#[derive(Clone, Debug)]
pub struct SendResult(Arc<Mutex<Option<oneshot::Sender<Value>>>>);
impl SendResult {
fn new() -> (Self, oneshot::Receiver<Value>) {
let (tx, rx) = oneshot::channel();
(SendResult(Arc::new(Mutex::new(Some(tx)))), rx)
}
pub fn send(self, v: Value) {
if let Some(s) = self.0.lock().take() {
let _ = s.send(v);
}
}
}
#[derive(Debug)]
pub struct WriteRequest {
pub id: Id,
pub path: Path,
pub client: ClId,
pub value: Value,
pub send_result: Option<SendResult>,
}
#[derive(Debug, Clone, Copy)]
pub enum Event {
Destroyed(Id),
Subscribe(Id, ClId),
Unsubscribe(Id, ClId),
}
#[derive(Debug)]
struct Update {
updates: GPooled<Vec<publisher::From>>,
unsubscribes: Option<GPooled<Vec<Id>>>,
}
impl Update {
fn new() -> Self {
Self { updates: UPDATES.take(), unsubscribes: None }
}
}
type MsgQ = Sender<(Option<Duration>, Update)>;
type Subscribed = Arc<FxHashSet<ClId>>;
pub type ExtendedAuth =
Box<dyn Fn(ClId, Id, Option<&UserInfo>) -> bool + Send + Sync + 'static>;
#[repr(transparent)]
struct ExtendedAuthWrap(ExtendedAuth);
impl fmt::Debug for ExtendedAuthWrap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "<Fn>")
}
}
#[derive(Debug)]
pub struct Val(Id);
impl Drop for Val {
fn drop(&mut self) {
PUBLISHERS.lock().retain(|t| match t.upgrade() {
None => false,
Some(t) => {
t.0.lock().destroy_val(self.0);
true
}
})
}
}
impl Val {
pub fn update<T: Into<Value>>(&self, batch: &mut UpdateBatch, v: T) {
batch.updates.push(BatchMsg::Update(None, self.0, v.into()))
}
pub fn try_update<T: TryInto<Value>>(
&self,
batch: &mut UpdateBatch,
v: T,
) -> result::Result<(), T::Error> {
Ok(batch.updates.push(BatchMsg::Update(None, self.0, v.try_into()?)))
}
pub fn update_changed<T: Into<Value>>(&self, batch: &mut UpdateBatch, v: T) {
batch.updates.push(BatchMsg::UpdateChanged(self.0, v.into()))
}
pub fn try_update_changed<T: TryInto<Value>>(
&self,
batch: &mut UpdateBatch,
v: T,
) -> result::Result<(), T::Error> {
Ok(batch.updates.push(BatchMsg::UpdateChanged(self.0, v.try_into()?)))
}
pub fn update_subscriber<T: Into<Value>>(
&self,
batch: &mut UpdateBatch,
dst: ClId,
v: T,
) {
batch.updates.push(BatchMsg::Update(Some(dst), self.0, v.into()));
}
pub fn try_update_subscriber<T: TryInto<Value>>(
&self,
batch: &mut UpdateBatch,
dst: ClId,
v: T,
) -> result::Result<(), T::Error> {
Ok(batch.updates.push(BatchMsg::Update(Some(dst), self.0, v.try_into()?)))
}
pub fn unsubscribe(&self, batch: &mut UpdateBatch, dst: ClId) {
match &mut batch.unsubscribes {
Some(u) => u.push((dst, self.0)),
None => {
let mut u = RAWUNSUBS.take();
u.push((dst, self.0));
batch.unsubscribes = Some(u);
}
}
}
pub fn id(&self) -> Id {
self.0
}
}
pub struct DefaultHandle {
chan: UnboundedReceiver<(Path, oneshot::Sender<()>)>,
path: Path,
publisher: PublisherWeak,
}
impl Stream for DefaultHandle {
type Item = (Path, oneshot::Sender<()>);
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.chan).poll_next(cx)
}
}
impl FusedStream for DefaultHandle {
fn is_terminated(&self) -> bool {
self.chan.is_terminated()
}
}
impl DefaultHandle {
pub fn advertise_with_flags(
&self,
mut flags: PublishFlags,
path: Path,
) -> Result<()> {
if !Path::is_parent(&*self.path, &path) {
bail!("advertisements must be under the default publisher path")
}
if let Some(pb) = self.publisher.upgrade() {
let mut pbl = pb.0.lock();
let inserted = match pbl.advertised.get_mut(&self.path) {
Some(set) => set.insert(path.clone()),
None => {
pbl.advertised
.insert(self.path.clone(), iter::once(path.clone()).collect());
true
}
};
if inserted && !pbl.by_path.contains_key(&path) {
flags.remove(PublishFlags::DESTROY_ON_IDLE);
let flags = if flags.is_empty() { None } else { Some(flags.bits()) };
pbl.to_unpublish.remove(&path);
pbl.to_publish.insert(path, flags);
pbl.trigger_publish()
}
}
Ok(())
}
pub fn advertise(&self, path: Path) -> Result<()> {
self.advertise_with_flags(PublishFlags::empty(), path)
}
pub fn remove_advertisement(&self, path: &Path) {
if let Some(pb) = self.publisher.upgrade() {
let mut pbl = pb.0.lock();
let removed = match pbl.advertised.get_mut(&self.path) {
None => false,
Some(set) => {
let res = set.remove(path);
if set.is_empty() {
pbl.advertised.remove(&self.path);
}
res
}
};
if removed && !pbl.by_path.contains_key(path) {
pbl.to_unpublish.insert(path.clone());
pbl.to_publish.remove(path);
pbl.trigger_publish()
}
}
}
}
impl Drop for DefaultHandle {
fn drop(&mut self) {
if let Some(t) = self.publisher.upgrade() {
let mut pb = t.0.lock();
pb.default.remove(self.path.as_ref());
pb.to_unpublish_default.insert(self.path.clone());
if let Some(paths) = pb.advertised.remove(&self.path) {
for path in paths {
if !pb.by_path.contains_key(&path) {
pb.to_publish.remove(&path);
pb.to_unpublish.insert(path);
}
}
}
pb.trigger_publish()
}
}
}
#[derive(Debug, Clone)]
pub enum BatchMsg {
UpdateChanged(Id, Value),
Update(Option<ClId>, Id, Value),
}
#[must_use = "update batches do nothing unless committed"]
#[derive(Debug)]
pub struct UpdateBatch {
origin: Publisher,
updates: GPooled<Vec<BatchMsg>>,
unsubscribes: Option<GPooled<Vec<(ClId, Id)>>>,
}
impl UpdateBatch {
pub fn len(&self) -> usize {
self.updates.len()
}
pub fn iter(&self) -> impl Iterator<Item = &BatchMsg> {
self.updates.iter()
}
pub fn merge_from(&mut self, other: &mut UpdateBatch) -> Result<()> {
if Arc::as_ptr(&self.origin.0) != Arc::as_ptr(&other.origin.0) {
bail!("can't merge batches from different publishers");
} else {
self.updates.extend(other.updates.drain(..));
match (&mut self.unsubscribes, &mut other.unsubscribes) {
(None, None) | (Some(_), None) => (),
(None, Some(_)) => {
self.unsubscribes = other.unsubscribes.take();
}
(Some(l), Some(r)) => {
l.extend(r.drain(..));
}
}
Ok(())
}
}
pub async fn commit(mut self, timeout: Option<Duration>) {
let empty = self.updates.is_empty()
&& self.unsubscribes.as_ref().map(|v| v.len()).unwrap_or(0) == 0;
if empty {
return;
}
let fut = {
let mut batch = BATCH.take();
let mut pb = self.origin.0.lock();
for m in self.updates.drain(..) {
match m {
BatchMsg::Update(None, id, v) => {
if let Some(pbl) = pb.by_id.get_mut(&id) {
for cl in pbl.subscribed.iter() {
batch
.entry(*cl)
.or_insert_with(Update::new)
.updates
.push(publisher::From::Update(id, v.clone()));
}
pbl.current = v;
}
}
BatchMsg::UpdateChanged(id, v) => {
if let Some(pbl) = pb.by_id.get_mut(&id) {
if pbl.current != v {
for cl in pbl.subscribed.iter() {
batch
.entry(*cl)
.or_insert_with(Update::new)
.updates
.push(publisher::From::Update(id, v.clone()));
}
pbl.current = v;
}
}
}
BatchMsg::Update(Some(cl), id, v) => batch
.entry(cl)
.or_insert_with(Update::new)
.updates
.push(publisher::From::Update(id, v)),
}
}
if let Some(usubs) = &mut self.unsubscribes {
for (cl, id) in usubs.drain(..) {
let update = batch.entry(cl).or_insert_with(Update::new);
match &mut update.unsubscribes {
Some(u) => u.push(id),
None => {
let mut u = UNSUBS.take();
u.push(id);
update.unsubscribes = Some(u);
}
}
}
}
future::join_all(
batch
.drain()
.filter_map(|(cl, batch)| {
pb.clients.get(&cl).map(move |cl| (cl.msg_queue.clone(), batch))
})
.map(|(mut q, batch)| async move {
let _: Result<_, _> = q.send((timeout, batch)).await;
}),
)
};
fut.await;
}
}
#[derive(Debug)]
struct Client {
msg_queue: MsgQ,
subscribed: FxHashMap<Id, Permissions>,
user: Option<UserInfo>,
}
#[derive(Debug)]
pub struct Published {
current: Value,
subscribed: Subscribed,
path: Path,
aliases: Option<Box<FxHashSet<Path>>>,
}
impl Published {
pub fn current(&self) -> &Value {
&self.current
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn subscribed(&self) -> &FxHashSet<ClId> {
&self.subscribed
}
}
#[derive(Debug)]
struct PublisherInner {
addr: SocketAddr,
stop: Option<oneshot::Sender<oneshot::Sender<()>>>,
clients: FxHashMap<ClId, Client>,
hc_subscribed: FxHashMap<BTreeSet<ClId>, Subscribed>,
by_path: HashMap<Path, Id>,
by_id: FxHashMap<Id, Published>,
destroy_on_idle: FxHashSet<Id>,
on_write_chans:
FxHashMap<ChanWrap<GPooled<Vec<WriteRequest>>>, (ChanId, HashSet<Id>)>,
on_event_chans: Vec<UnboundedSender<Event>>,
on_event_by_id_chans: FxHashMap<Id, Vec<UnboundedSender<Event>>>,
extended_auth: Option<ExtendedAuthWrap>,
on_write: FxHashMap<Id, Vec<(ChanId, Sender<GPooled<Vec<WriteRequest>>>)>>,
resolver: ResolverWrite,
advertised: HashMap<Path, HashSet<Path>>,
to_publish: GPooled<HashMap<Path, Option<u32>>>,
to_publish_default: GPooled<HashMap<Path, Option<u32>>>,
to_unpublish: GPooled<HashSet<Path>>,
to_unpublish_default: GPooled<HashSet<Path>>,
to_unsubscribe: GPooled<HashMap<Id, Subscribed>>,
publish_triggered: bool,
trigger_publish: UnboundedSender<Option<oneshot::Sender<()>>>,
wait_clients: FxHashMap<Id, Vec<oneshot::Sender<()>>>,
wait_any_client: Vec<oneshot::Sender<()>>,
default: BTreeMap<Path, UnboundedSender<(Path, oneshot::Sender<()>)>>,
}
impl PublisherInner {
fn is_advertised(&self, path: &Path) -> bool {
self.advertised
.iter()
.any(|(b, set)| Path::is_parent(&**b, &**path) && set.contains(path))
}
pub fn check_publish(&self, path: &Path) -> Result<()> {
if !Path::is_absolute(&path) {
bail!("can't publish a relative path")
}
if self.stop.is_none() {
bail!("publisher is dead")
}
if self.by_path.contains_key(path) {
bail!("already published")
}
Ok(())
}
pub fn publish(&mut self, id: Id, flags: PublishFlags, path: Path) {
self.by_path.insert(path.clone(), id);
self.to_unpublish.remove(&path);
self.to_publish.insert(
path.clone(),
if flags.is_empty() { None } else { Some(flags.bits()) },
);
self.trigger_publish();
}
fn unpublish(&mut self, path: &Path) {
self.by_path.remove(path);
if !self.is_advertised(path) {
self.to_publish.remove(path);
self.to_unpublish.insert(path.clone());
self.trigger_publish();
}
}
fn destroy_val(&mut self, id: Id) {
if let Some(pbl) = self.by_id.remove(&id) {
let path = pbl.path;
for path in iter::once(&path).chain(pbl.aliases.iter().flat_map(|v| v.iter()))
{
self.unpublish(path)
}
self.wait_clients.remove(&id);
if let Some(chans) = self.on_write.remove(&id) {
for (_, c) in chans {
match self.on_write_chans.entry(ChanWrap(c)) {
Entry::Vacant(_) => (),
Entry::Occupied(mut e) => {
e.get_mut().1.remove(&id);
if e.get().1.is_empty() {
e.remove();
}
}
}
}
}
self.send_event(Event::Destroyed(id));
self.on_event_by_id_chans.remove(&id);
if pbl.subscribed.len() > 0 {
self.to_unsubscribe.insert(id, pbl.subscribed);
}
}
}
fn send_event(&mut self, event: Event) {
self.on_event_chans.retain(|chan| chan.unbounded_send(event).is_ok());
let id = match &event {
Event::Destroyed(id) => id,
Event::Subscribe(id, _) => id,
Event::Unsubscribe(id, _) => id,
};
if let Some(chans) = self.on_event_by_id_chans.get_mut(id) {
chans.retain(|chan| chan.unbounded_send(event).is_ok());
if chans.is_empty() {
self.on_event_by_id_chans.remove(id);
}
}
}
fn trigger_publish(&mut self) {
if !self.publish_triggered {
self.publish_triggered = true;
let _: Result<_, _> = self.trigger_publish.unbounded_send(None);
}
}
fn writes(&mut self, id: Id, tx: Sender<GPooled<Vec<WriteRequest>>>) {
if self.by_id.contains_key(&id) {
let e = self
.on_write_chans
.entry(ChanWrap(tx.clone()))
.or_insert_with(|| (ChanId::new(), HashSet::new()));
e.1.insert(id);
let cid = e.0;
let mut gc = Vec::new();
let ow = self.on_write.entry(id).or_insert_with(Vec::new);
ow.retain(|(_, c)| {
if c.is_closed() {
gc.push(ChanWrap(c.clone()));
false
} else {
true
}
});
ow.push((cid, tx));
for c in gc {
self.on_write_chans.remove(&c);
}
}
}
}
impl Drop for PublisherInner {
fn drop(&mut self) {
let stopped = mem::replace(&mut self.stop, None).map(|stop| {
let (tx, rx) = oneshot::channel();
let _ = stop.send(tx);
rx
});
if let Some(stopped) = stopped {
let resolver = self.resolver.clone();
tokio::spawn(async move {
let _ = stopped.await;
let _ = resolver.clear().await;
});
}
}
}
#[derive(Clone)]
struct PublisherWeak(Weak<Mutex<PublisherInner>>);
impl PublisherWeak {
fn upgrade(&self) -> Option<Publisher> {
Weak::upgrade(&self.0).map(|r| Publisher(r))
}
}
fn rand_port(current: u16) -> u16 {
let mut rng = rand::rng();
current + rng.random_range(0u16..10u16)
}
#[derive(Debug)]
pub struct PublisherBuilder {
config: Option<Config>,
desired_auth: Option<DesiredAuth>,
bind_cfg: Option<BindCfg>,
priority: PublisherPriority,
max_clients: usize,
slack: usize,
}
impl PublisherBuilder {
pub fn new(config: Config) -> Self {
Self {
config: Some(config),
desired_auth: None,
bind_cfg: None,
priority: PublisherPriority::Normal,
max_clients: 768,
slack: 3,
}
}
pub async fn build(&mut self) -> Result<Publisher> {
let cfg = self.config.take().ok_or_else(|| anyhow!("config is required"))?;
let desired_auth = self.desired_auth.take().unwrap_or_else(|| cfg.default_auth());
let bind_cfg =
self.bind_cfg.take().unwrap_or_else(|| cfg.default_bind_config.clone());
Publisher::new(
cfg,
desired_auth,
bind_cfg,
self.priority,
self.max_clients,
self.slack,
)
.await
}
pub fn desired_auth(&mut self, auth: DesiredAuth) -> &mut Self {
self.desired_auth = Some(auth);
self
}
pub fn bind_cfg(&mut self, bind: Option<BindCfg>) -> &mut Self {
self.bind_cfg = bind;
self
}
pub fn max_clients(&mut self, max_clients: usize) -> &mut Self {
self.max_clients = max_clients;
self
}
pub fn slack(&mut self, slack: usize) -> &mut Self {
self.slack = slack;
self
}
pub fn priority(&mut self, priority: PublisherPriority) -> &mut Self {
self.priority = priority;
self
}
}
#[derive(Debug, Clone)]
pub struct Publisher(Arc<Mutex<PublisherInner>>);
impl Publisher {
fn downgrade(&self) -> PublisherWeak {
PublisherWeak(Arc::downgrade(&self.0))
}
pub async fn new(
resolver: Config,
desired_auth: DesiredAuth,
bind_cfg: BindCfg,
priority: PublisherPriority,
max_clients: usize,
slack: usize,
) -> Result<Publisher> {
let (public, private) = bind_cfg.select()?;
utils::check_addr(public, &resolver.addrs)?;
let (addr, listener) = match bind_cfg {
BindCfg::Exact(addr) => {
let l = TcpListener::bind(&addr).await?;
(l.local_addr()?, l)
}
BindCfg::ElasticExact { public, private } => {
let l = TcpListener::bind(&private).await?;
(public, l)
}
BindCfg::Match { .. } | BindCfg::Local | BindCfg::Elastic { .. } => {
let mkaddr = |ip: IpAddr, port: u16| -> Result<SocketAddr> {
Ok((ip, port)
.to_socket_addrs()?
.next()
.ok_or_else(|| anyhow!("socketaddrs bug"))?)
};
let mut port = 5000;
loop {
if port >= 32768 {
bail!("couldn't allocate a port");
}
port = rand_port(port);
let addr = mkaddr(private, port)?;
match TcpListener::bind(&addr).await {
Ok(l) => break (mkaddr(public, port)?, l),
Err(e) => {
if e.kind() != std::io::ErrorKind::AddrInUse {
bail!(e)
}
}
}
}
}
};
let tls_ctx = resolver.tls.clone().map(tls::CachedAcceptor::new);
let resolver =
ResolverWrite::new(resolver, desired_auth.clone(), addr, priority)?;
let (stop, receive_stop) = oneshot::channel();
let (tx_trigger, rx_trigger) = unbounded();
let pb = Publisher(Arc::new(Mutex::new(PublisherInner {
addr,
stop: Some(stop),
clients: HashMap::default(),
hc_subscribed: HashMap::default(),
by_path: HashMap::new(),
by_id: HashMap::default(),
destroy_on_idle: HashSet::default(),
on_write_chans: HashMap::default(),
on_event_chans: Vec::new(),
on_event_by_id_chans: HashMap::default(),
extended_auth: None,
on_write: HashMap::default(),
resolver,
advertised: HashMap::new(),
to_publish: TOPUB.take(),
to_publish_default: TOPUB.take(),
to_unpublish: TOUPUB.take(),
to_unpublish_default: TOUPUB.take(),
to_unsubscribe: TOUSUB.take(),
publish_triggered: false,
trigger_publish: tx_trigger,
wait_clients: HashMap::default(),
wait_any_client: Vec::new(),
default: BTreeMap::new(),
})));
task::spawn({
let pb_weak = pb.downgrade();
async move {
server::start(
pb_weak.clone(),
listener,
receive_stop,
desired_auth,
tls_ctx,
max_clients,
slack,
)
.await;
info!("accept loop shutdown");
}
});
task::spawn({
let pb_weak = pb.downgrade();
async move {
publish_loop(pb_weak, rx_trigger).await;
info!("publish loop shutdown")
}
});
PUBLISHERS.lock().push(pb.downgrade());
Ok(pb)
}
pub fn set_extended_authorization(&self, f: ExtendedAuth) {
self.0.lock().extended_auth = Some(ExtendedAuthWrap(f));
}
pub fn clear_extended_authorization(&self) {
self.0.lock().extended_auth = None;
}
pub async fn shutdown(self) {
let stopped = mem::replace(&mut self.0.lock().stop, None).map(|stop| {
let (tx, rx) = oneshot::channel();
let _ = stop.send(tx);
rx
});
if let Some(stopped) = stopped {
let _ = stopped.await;
}
let resolver = {
let mut inner = self.0.lock();
inner.clients.clear();
inner.by_id.clear();
inner.resolver.clone()
};
let _: Result<_> = resolver.clear().await;
}
pub fn addr(&self) -> SocketAddr {
self.0.lock().addr
}
pub fn publish_with_flags_and_writes<T>(
&self,
mut flags: PublishFlags,
path: Path,
init: T,
tx: Option<Sender<GPooled<Vec<WriteRequest>>>>,
) -> Result<Val>
where
T: TryInto<Value>,
<T as TryInto<Value>>::Error: std::error::Error + Send + Sync + 'static,
{
let init: Value = init.try_into()?;
let id = Id::new();
let destroy_on_idle = flags.contains(PublishFlags::DESTROY_ON_IDLE);
flags.remove(PublishFlags::DESTROY_ON_IDLE);
let mut pb = self.0.lock();
pb.check_publish(&path)?;
let subscribed = pb
.hc_subscribed
.entry(BTreeSet::new())
.or_insert_with(|| Arc::new(HashSet::default()))
.clone();
pb.by_id.insert(
id,
Published { current: init, subscribed, path: path.clone(), aliases: None },
);
if destroy_on_idle {
pb.destroy_on_idle.insert(id);
}
if let Some(tx) = tx {
pb.writes(id, tx);
}
pb.publish(id, flags, path.clone());
Ok(Val(id))
}
pub fn publish_with_flags<T>(
&self,
flags: PublishFlags,
path: Path,
init: T,
) -> Result<Val>
where
T: TryInto<Value>,
<T as TryInto<Value>>::Error: std::error::Error + Send + Sync + 'static,
{
self.publish_with_flags_and_writes(flags, path, init, None)
}
pub fn alias_with_flags(
&self,
id: Id,
mut flags: PublishFlags,
path: Path,
) -> Result<()> {
flags.remove(PublishFlags::DESTROY_ON_IDLE);
let mut pb = self.0.lock();
if !pb.by_id.contains_key(&id) {
bail!("no such value published by this publisher")
}
pb.check_publish(&path)?;
pb.publish(id, flags, path.clone());
let v = pb.by_id.get_mut(&id).unwrap();
match &mut v.aliases {
Some(a) => {
a.insert(path);
}
None => {
let mut set = HashSet::default();
set.insert(path);
v.aliases = Some(Box::new(set))
}
}
Ok(())
}
pub fn publish<T>(&self, path: Path, init: T) -> Result<Val>
where
T: TryInto<Value>,
<T as TryInto<Value>>::Error: std::error::Error + Send + Sync + 'static,
{
self.publish_with_flags(PublishFlags::empty(), path, init)
}
pub fn alias(&self, id: Id, path: Path) -> Result<()> {
self.alias_with_flags(id, PublishFlags::empty(), path)
}
pub fn remove_alias(&self, id: Id, path: &Path) {
let mut pb = self.0.lock();
if let Some(pbv) = pb.by_id.get_mut(&id) {
if let Some(al) = &mut pbv.aliases {
if al.remove(path) {
pb.unpublish(path)
}
}
}
}
pub fn remove_all_aliases(&self, id: Id) {
let mut pb = self.0.lock();
if let Some(pbv) = pb.by_id.get_mut(&id) {
if let Some(mut al) = pbv.aliases.take() {
for path in al.drain() {
pb.unpublish(&path)
}
}
}
}
pub fn publish_default_with_flags(
&self,
flags: PublishFlags,
base: Path,
) -> Result<DefaultHandle> {
if !Path::is_absolute(base.as_ref()) {
bail!("can't publish a relative path")
}
let (tx, rx) = unbounded();
let mut pb = self.0.lock();
if pb.default.contains_key(&base) {
bail!("default is already published")
}
if pb.stop.is_none() {
bail!("publisher is dead")
}
pb.to_unpublish.remove(base.as_ref());
pb.to_publish_default.insert(
base.clone(),
if flags.is_empty() { None } else { Some(flags.bits()) },
);
pb.default.insert(base.clone(), tx);
pb.trigger_publish();
Ok(DefaultHandle { chan: rx, path: base, publisher: self.downgrade() })
}
pub fn publish_default(&self, base: Path) -> Result<DefaultHandle> {
self.publish_default_with_flags(PublishFlags::empty(), base)
}
pub fn start_batch(&self) -> UpdateBatch {
UpdateBatch { origin: self.clone(), updates: RAWBATCH.take(), unsubscribes: None }
}
pub async fn flushed(&self) {
let (tx, rx) = oneshot::channel();
let _: Result<_, _> = self.0.lock().trigger_publish.unbounded_send(Some(tx));
let _ = rx.await;
}
pub fn clients(&self) -> usize {
self.0.lock().clients.len()
}
pub async fn wait_any_client(&self) {
let wait = {
let mut inner = self.0.lock();
if inner.clients.len() > 0 {
return;
} else {
let (tx, rx) = oneshot::channel();
inner.wait_any_client.push(tx);
rx
}
};
let _ = wait.await;
}
pub async fn wait_any_new_client(&self) {
let (tx, rx) = oneshot::channel();
self.0.lock().wait_any_client.push(tx);
let _ = rx.await;
}
pub async fn wait_client(&self, id: Id) {
let wait = {
let mut inner = self.0.lock();
match inner.by_id.get(&id) {
None => return,
Some(ut) => {
if ut.subscribed.len() > 0 {
return;
}
let (tx, rx) = oneshot::channel();
inner.wait_clients.entry(id).or_insert_with(Vec::new).push(tx);
rx
}
}
};
let _ = wait.await;
}
pub fn id<S: AsRef<str>>(&self, path: S) -> Option<Id> {
self.0.lock().by_path.get(path.as_ref()).map(|id| *id)
}
pub fn path(&self, id: Id) -> Option<Path> {
self.0.lock().by_id.get(&id).map(|pbl| pbl.path.clone())
}
pub fn aliases(&self, id: Id) -> Vec<Path> {
let pb = self.0.lock();
match pb.by_id.get(&id) {
None => vec![],
Some(pbv) => match &pbv.aliases {
None => vec![],
Some(al) => al.iter().cloned().collect(),
},
}
}
pub fn current(&self, id: &Id) -> Option<Value> {
self.0.lock().by_id.get(&id).map(|p| p.current.clone())
}
pub fn subscribed(&self, id: &Id) -> Vec<ClId> {
self.0
.lock()
.by_id
.get(&id)
.map(|p| p.subscribed.iter().copied().collect::<Vec<_>>())
.unwrap_or_else(Vec::new)
}
pub fn put_subscribed(&self, id: &Id, into: &mut impl Extend<ClId>) {
if let Some(p) = self.0.lock().by_id.get(&id) {
into.extend(p.subscribed.iter().copied())
}
}
pub fn is_subscribed(&self, id: &Id, client: &ClId) -> bool {
match self.0.lock().by_id.get(&id) {
Some(p) => p.subscribed.contains(client),
None => false,
}
}
pub fn user(&self, client: &ClId) -> Option<UserInfo> {
self.0.lock().clients.get(client).and_then(|c| c.user.clone())
}
pub fn subscribed_len(&self, id: &Id) -> usize {
self.0.lock().by_id.get(&id).map(|p| p.subscribed.len()).unwrap_or(0)
}
pub fn writes(&self, id: Id, tx: Sender<GPooled<Vec<WriteRequest>>>) {
self.0.lock().writes(id, tx)
}
pub fn stop_writes(&self, id: Id) {
let mut pb = self.0.lock();
pb.on_write.remove(&id);
}
pub fn events(&self, tx: UnboundedSender<Event>) {
self.0.lock().on_event_chans.push(tx)
}
pub fn events_for_id(&self, id: Id, tx: UnboundedSender<Event>) {
self.0.lock().on_event_by_id_chans.entry(id).or_insert(vec![]).push(tx);
}
}
async fn publish_loop(
publisher: PublisherWeak,
mut trigger_rx: UnboundedReceiver<Option<oneshot::Sender<()>>>,
) {
while let Some(reply) = trigger_rx.next().await {
if let Some(publisher) = publisher.upgrade() {
let mut to_publish;
let mut to_publish_default;
let mut to_unpublish;
let mut to_unpublish_default;
let mut to_unsubscribe;
let resolver = {
let mut pb = publisher.0.lock();
to_publish = mem::replace(&mut pb.to_publish, TOPUB.take());
to_publish_default =
mem::replace(&mut pb.to_publish_default, TOPUB.take());
to_unpublish = mem::replace(&mut pb.to_unpublish, TOUPUB.take());
to_unpublish_default =
mem::replace(&mut pb.to_unpublish_default, TOUPUB.take());
to_unsubscribe = mem::replace(&mut pb.to_unsubscribe, TOUSUB.take());
pb.publish_triggered = false;
pb.resolver.clone()
};
if to_publish.len() > 0 {
if let Err(e) = resolver.publish_with_flags(to_publish.drain()).await {
warn!("failed to publish some paths {} will retry", e);
}
}
if to_publish_default.len() > 0 {
if let Err(e) =
resolver.publish_default_with_flags(to_publish_default.drain()).await
{
warn!("failed to publish_default some paths {} will retry", e)
}
}
if to_unpublish.len() > 0 {
if let Err(e) = resolver.unpublish(to_unpublish.drain()).await {
warn!("failed to unpublish some paths {} will retry", e)
}
}
if to_unpublish_default.len() > 0 {
if let Err(e) =
resolver.unpublish_default(to_unpublish_default.drain()).await
{
warn!("failed to unpublish default some paths {} will retry", e)
}
}
if to_unsubscribe.len() > 0 {
let mut usubs = RAWUNSUBS.take();
for (id, subs) in to_unsubscribe.drain() {
for cl in subs.iter() {
usubs.push((*cl, id));
}
}
let mut batch = publisher.start_batch();
batch.unsubscribes = Some(usubs);
batch.commit(None).await;
}
}
if let Some(reply) = reply {
let _ = reply.send(());
}
}
}