netidx_container/
lib.rs

1#[macro_use]
2extern crate netidx_protocols;
3
4mod db;
5mod rpcs;
6mod stats;
7
8use crate::rpcs::RpcApi;
9use anyhow::{bail, Result};
10use arcstr::{literal, ArcStr};
11pub use db::{Datum, DatumKind, Db, Reply, Sendable, Txn};
12use derive_builder::Builder;
13use futures::{
14    self,
15    channel::{mpsc, oneshot},
16    future,
17    prelude::*,
18    select_biased,
19    stream::FusedStream,
20};
21use fxhash::FxHashMap;
22use log::{error, info};
23use netidx::{
24    config::Config,
25    path::Path,
26    publisher::{
27        BindCfg, DefaultHandle, Event as PEvent, Id, PublishFlags, Publisher,
28        PublisherBuilder, UpdateBatch, Val, Value, WriteRequest,
29    },
30    resolver_client::DesiredAuth,
31    utils::BatchItem,
32};
33use parking_lot::Mutex;
34use poolshark::global::GPooled;
35use rpcs::{RpcRequest, RpcRequestKind};
36use stats::Stats;
37use std::{
38    collections::{
39        BTreeMap,
40        Bound::{self, *},
41    },
42    mem,
43    ops::{Deref, DerefMut},
44    path::PathBuf,
45    pin::Pin,
46    time::Duration,
47};
48use structopt::StructOpt;
49use tokio::task;
50use triomphe::Arc;
51
52#[derive(StructOpt, Builder, Debug)]
53pub struct Params {
54    #[structopt(
55        short = "b",
56        long = "bind",
57        help = "configure the bind address e.g. local, 192.168.0.0/16, 127.0.0.1:5000"
58    )]
59    #[builder(setter(strip_option), default)]
60    pub bind: Option<BindCfg>,
61    #[structopt(
62        long = "timeout",
63        help = "require subscribers to consume values before timeout (seconds)"
64    )]
65    #[builder(setter(strip_option), default)]
66    pub timeout: Option<u64>,
67    #[structopt(long = "slack", help = "set the publisher slack (default 3 batches)")]
68    #[builder(setter(strip_option), default)]
69    pub slack: Option<usize>,
70    #[structopt(
71        long = "max_clients",
72        help = "set the maximum number of clients (default 768)"
73    )]
74    #[builder(setter(strip_option), default)]
75    pub max_clients: Option<usize>,
76    #[structopt(long = "api-path", help = "the netidx path of the container api")]
77    #[builder(setter(strip_option), default)]
78    pub api_path: Option<Path>,
79    #[structopt(long = "db", help = "the db file")]
80    #[builder(setter(strip_option), default)]
81    pub db: Option<String>,
82    #[structopt(long = "cache-size", help = "db page cache size in bytes")]
83    #[builder(setter(strip_option), default)]
84    pub cache_size: Option<u64>,
85    #[structopt(long = "sparse", help = "don't even advertise the contents of the db")]
86    #[builder(default = "false")]
87    pub sparse: bool,
88}
89
90impl Params {
91    pub fn default_db_path() -> Option<PathBuf> {
92        dirs::data_dir().map(|mut p| {
93            p.push("netidx");
94            p.push("container");
95            p.push("db");
96            p
97        })
98    }
99}
100
101macro_rules! or_reply {
102    ($reply:expr, $r:expr) => {
103        match $r {
104            Ok(r) => r,
105            Err(e) => {
106                if let Some(reply) = $reply {
107                    let e = Value::error(format!("{}", e));
108                    reply.send(e);
109                }
110                return;
111            }
112        }
113    };
114}
115
116struct Published {
117    path: Path,
118    val: Val,
119}
120
121#[must_use = "streams do nothing unless polled"]
122struct Roots(BTreeMap<Path, DefaultHandle>);
123
124impl Deref for Roots {
125    type Target = BTreeMap<Path, DefaultHandle>;
126
127    fn deref(&self) -> &Self::Target {
128        &self.0
129    }
130}
131
132impl DerefMut for Roots {
133    fn deref_mut(&mut self) -> &mut Self::Target {
134        &mut self.0
135    }
136}
137
138impl<'a> Stream for Roots {
139    type Item = (Path, oneshot::Sender<()>);
140
141    fn poll_next(
142        mut self: Pin<&mut Self>,
143        cx: &mut std::task::Context<'_>,
144    ) -> std::task::Poll<Option<Self::Item>> {
145        use std::task::Poll;
146        for dh in self.values_mut() {
147            match Pin::new(&mut *dh).poll_next(cx) {
148                Poll::Pending => (),
149                r @ Poll::Ready(_) => return r,
150            }
151        }
152        Poll::Pending
153    }
154}
155
156impl<'a> FusedStream for Roots {
157    fn is_terminated(&self) -> bool {
158        self.values().all(|ch| ch.is_terminated())
159    }
160}
161
162struct ContainerInner {
163    params: Params,
164    db: Db,
165    publisher: Publisher,
166    api_path: Option<Path>,
167    stats: Option<Stats>,
168    locked: BTreeMap<Path, bool>,
169    write_updates_tx: mpsc::Sender<GPooled<Vec<WriteRequest>>>,
170    write_updates_rx: mpsc::Receiver<GPooled<Vec<WriteRequest>>>,
171    by_id: FxHashMap<Id, Arc<Published>>,
172    by_path: FxHashMap<Path, Arc<Published>>,
173    publish_events: mpsc::UnboundedReceiver<PEvent>,
174    roots: Roots,
175    db_updates: mpsc::UnboundedReceiver<db::Update>,
176    api: Option<rpcs::RpcApi>,
177}
178
179impl ContainerInner {
180    async fn new(cfg: Config, auth: DesiredAuth, params: Params) -> Result<Self> {
181        let (publish_events_tx, publish_events) = mpsc::unbounded();
182        let mut publisher = PublisherBuilder::new(cfg.clone());
183        publisher.desired_auth(auth.clone()).bind_cfg(params.bind);
184        if let Some(n) = params.slack {
185            publisher.slack(n);
186        }
187        if let Some(n) = params.max_clients {
188            publisher.max_clients(n);
189        }
190        let publisher = publisher.build().await?;
191        publisher.events(publish_events_tx);
192        let (db, db_updates) =
193            Db::new(&params, publisher.clone(), params.api_path.clone())?;
194        let (write_updates_tx, write_updates_rx) = mpsc::channel(3);
195        let (api_path, api) = match params.api_path.as_ref() {
196            None => (None, None),
197            Some(api_path) => {
198                let api_path = api_path.append("rpcs");
199                let api = rpcs::RpcApi::new(&publisher, &api_path)?;
200                (Some(api_path), Some(api))
201            }
202        };
203        let stats = match params.api_path.as_ref() {
204            None => None,
205            Some(p) => Some(Stats::new(publisher.clone(), p.clone())),
206        };
207        Ok(Self {
208            params,
209            api_path,
210            stats,
211            locked: BTreeMap::new(),
212            roots: Roots(BTreeMap::new()),
213            db,
214            publisher,
215            write_updates_tx,
216            write_updates_rx,
217            by_id: FxHashMap::default(),
218            by_path: FxHashMap::default(),
219            publish_events,
220            db_updates,
221            api,
222        })
223    }
224
225    fn get_root(&self, path: &Path) -> Option<(&Path, &DefaultHandle)> {
226        match self
227            .roots
228            .range::<str, (Bound<&str>, Bound<&str>)>((
229                Unbounded,
230                Included(path.as_ref()),
231            ))
232            .next_back()
233        {
234            None => None,
235            Some((prev, def)) => {
236                if path.starts_with(prev.as_ref()) {
237                    Some((prev, def))
238                } else {
239                    None
240                }
241            }
242        }
243    }
244
245    fn check_path(&self, path: Path) -> Result<Path> {
246        if self.get_root(&path).is_some() {
247            Ok(path)
248        } else {
249            bail!("non root path")
250        }
251    }
252
253    fn publish_data(&mut self, path: Path, value: Value) -> Result<()> {
254        self.advertise_path(path.clone());
255        let val = self.publisher.publish_with_flags(
256            PublishFlags::DESTROY_ON_IDLE,
257            path.clone(),
258            value.clone(),
259        )?;
260        let id = val.id();
261        self.publisher.writes(val.id(), self.write_updates_tx.clone());
262        let published = Arc::new(Published { path: path.clone(), val });
263        self.by_id.insert(id, published.clone());
264        self.by_path.insert(path.clone(), published);
265        Ok(())
266    }
267
268    async fn init(&mut self) -> Result<()> {
269        let mut batch = self.publisher.start_batch();
270        for res in self.db.roots() {
271            let path = res?;
272            let def = self.publisher.publish_default(path.clone())?;
273            self.roots.insert(path, def);
274        }
275        if let Some(stats) = &mut self.stats {
276            let _ = stats.set_roots(&mut batch, &self.roots);
277        }
278        for res in self.db.locked() {
279            let (path, locked) = res?;
280            let path = self.check_path(path)?;
281            self.locked.insert(path, locked);
282        }
283        if let Some(stats) = &mut self.stats {
284            let _ = stats.set_locked(&mut batch, &self.locked);
285        }
286        for res in self.db.iter() {
287            let (path, kind, _) = res?;
288            match kind {
289                DatumKind::Data | DatumKind::Formula => {
290                    let path = self.check_path(path)?;
291                    self.advertise_path(path);
292                }
293                DatumKind::Deleted | DatumKind::Invalid => (),
294            }
295        }
296        Ok(batch.commit(self.params.timeout.map(Duration::from_secs)).await)
297    }
298
299    fn process_writes(&mut self, txn: &mut Txn, mut writes: GPooled<Vec<WriteRequest>>) {
300        // CR estokes: log this
301        for req in writes.drain(..) {
302            let reply = req.send_result.map(Sendable::Write);
303            if let Some(p) = self.by_id.get(&req.id) {
304                txn.set_data(true, p.path.clone(), req.value, reply);
305            }
306        }
307    }
308
309    fn is_locked_gen(&self, path: &Path, parent_only: bool) -> bool {
310        let mut iter = self.locked.range::<str, (Bound<&str>, Bound<&str>)>((
311            Bound::Unbounded,
312            if parent_only {
313                Bound::Excluded(path.as_ref())
314            } else {
315                Bound::Included(path.as_ref())
316            },
317        ));
318        loop {
319            match iter.next_back() {
320                None => break false,
321                Some((p, locked)) if Path::is_parent(p, &path) => break *locked,
322                Some(_) => (),
323            }
324        }
325    }
326
327    fn is_locked(&self, path: &Path) -> bool {
328        self.is_locked_gen(path, false)
329    }
330
331    fn process_publish_request(&mut self, path: Path, reply: oneshot::Sender<()>) {
332        match self.check_path(path) {
333            Err(_) => {
334                // this should not be possible, but in case of a bug, just do nothing
335                let _: Result<_, _> = reply.send(());
336            }
337            Ok(path) => {
338                match self.db.lookup(path.as_ref()) {
339                    Ok(Some(Datum::Data(v) | Datum::Formula(v, _))) => {
340                        let _: Result<()> = self.publish_data(path, v);
341                    }
342                    Err(_) | Ok(Some(Datum::Deleted)) | Ok(None) => {
343                        let locked = self.is_locked(&path);
344                        let api = self
345                            .api_path
346                            .as_ref()
347                            .map(|p| Path::is_parent(p, &path))
348                            .unwrap_or(false);
349                        if !locked && !api {
350                            let _: Result<()> = self.publish_data(path, Value::Null);
351                        };
352                    }
353                }
354                let _: Result<_, _> = reply.send(());
355            }
356        }
357    }
358
359    fn process_publish_event(&mut self, e: PEvent) {
360        match e {
361            PEvent::Subscribe(_, _) | PEvent::Unsubscribe(_, _) => (),
362            PEvent::Destroyed(id) => {
363                if let Some(p) = self.by_id.remove(&id) {
364                    self.by_path.remove(&p.path);
365                }
366            }
367        }
368    }
369
370    fn delete_path(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
371        let path = or_reply!(reply, self.check_path(path));
372        let bn = Path::basename(&path);
373        if bn == Some(".formula") || bn == Some(".on-write") {
374            if let Some(path) = Path::dirname(&path) {
375                txn.remove(Path::from(ArcStr::from(path)), reply);
376            }
377        } else {
378            txn.remove(path, reply);
379        }
380    }
381
382    fn delete_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
383        let path = or_reply!(reply, self.check_path(path));
384        txn.remove_subtree(path, reply);
385    }
386
387    fn lock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
388        let path = or_reply!(reply, self.check_path(path));
389        txn.set_locked(path, reply);
390    }
391
392    fn unlock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
393        let path = or_reply!(reply, self.check_path(path));
394        txn.set_unlocked(path, reply);
395    }
396
397    fn set_data(&mut self, txn: &mut Txn, path: Path, value: Value, reply: Reply) {
398        let path = or_reply!(reply, self.check_path(path));
399        txn.set_data(true, path, value, reply);
400    }
401
402    fn create_sheet(
403        &self,
404        txn: &mut Txn,
405        path: Path,
406        rows: usize,
407        columns: usize,
408        max_rows: usize,
409        max_columns: usize,
410        lock: bool,
411        reply: Reply,
412    ) {
413        let path = or_reply!(reply, self.check_path(path));
414        if rows > max_rows || columns > max_columns {
415            let m = literal!("rows <= max_rows && columns <= max_columns");
416            if let Some(reply) = reply {
417                reply.send(Value::error(m));
418            }
419        } else {
420            txn.create_sheet(path, rows, columns, max_rows, max_columns, lock, reply);
421        }
422    }
423
424    fn create_table(
425        &self,
426        txn: &mut Txn,
427        path: Path,
428        rows: Vec<ArcStr>,
429        columns: Vec<ArcStr>,
430        lock: bool,
431        reply: Reply,
432    ) {
433        let path = or_reply!(reply, self.check_path(path));
434        txn.create_table(path, rows, columns, lock, reply);
435    }
436
437    fn process_rpc_requests(&mut self, txn: &mut Txn, reqs: &mut Vec<RpcRequest>) {
438        let mut process_non_packed = |reply: Sendable, req: RpcRequestKind| match req {
439            RpcRequestKind::Delete(path) => self.delete_path(txn, path, Some(reply)),
440            RpcRequestKind::DeleteSubtree(path) => {
441                self.delete_subtree(txn, path, Some(reply))
442            }
443            RpcRequestKind::LockSubtree(path) => {
444                self.lock_subtree(txn, path, Some(reply))
445            }
446            RpcRequestKind::UnlockSubtree(path) => {
447                self.unlock_subtree(txn, path, Some(reply))
448            }
449            RpcRequestKind::SetData { path, value } => {
450                self.set_data(txn, path, value, Some(reply))
451            }
452            RpcRequestKind::CreateSheet {
453                path,
454                rows,
455                columns,
456                max_rows,
457                max_columns,
458                lock,
459            } => self.create_sheet(
460                txn,
461                path,
462                rows,
463                columns,
464                max_rows,
465                max_columns,
466                lock,
467                Some(reply),
468            ),
469            RpcRequestKind::AddSheetRows(path, rows) => {
470                txn.add_sheet_rows(path, rows, Some(reply));
471            }
472            RpcRequestKind::AddSheetCols(path, cols) => {
473                txn.add_sheet_columns(path, cols, Some(reply));
474            }
475            RpcRequestKind::DelSheetRows(path, rows) => {
476                txn.del_sheet_rows(path, rows, Some(reply));
477            }
478            RpcRequestKind::DelSheetCols(path, cols) => {
479                txn.del_sheet_columns(path, cols, Some(reply));
480            }
481            RpcRequestKind::CreateTable { path, rows, columns, lock } => {
482                self.create_table(txn, path, rows, columns, lock, Some(reply))
483            }
484            RpcRequestKind::AddTableRows(path, rows) => {
485                txn.add_table_rows(path, rows, Some(reply));
486            }
487            RpcRequestKind::AddTableCols(path, cols) => {
488                txn.add_table_columns(path, cols, Some(reply));
489            }
490            RpcRequestKind::DelTableRows(path, rows) => {
491                txn.del_table_rows(path, rows, Some(reply));
492            }
493            RpcRequestKind::DelTableCols(path, cols) => {
494                txn.del_table_columns(path, cols, Some(reply));
495            }
496            RpcRequestKind::AddRoot(path) => {
497                txn.add_root(path, Some(reply));
498            }
499            RpcRequestKind::DelRoot(path) => {
500                txn.del_root(path, Some(reply));
501            }
502            RpcRequestKind::Packed(_) => unreachable!(),
503        };
504        for mut req in reqs.drain(..) {
505            match req.kind {
506                RpcRequestKind::Packed(reqs) => {
507                    let res = Arc::new(Mutex::new(Value::Null));
508                    for req in reqs {
509                        let reply = Sendable::Packed(res.clone());
510                        process_non_packed(reply, req)
511                    }
512                    req.reply.send(mem::replace(&mut *res.lock(), Value::Null));
513                }
514                k => {
515                    let reply = Sendable::Rpc(req.reply);
516                    process_non_packed(reply, k)
517                }
518            }
519        }
520    }
521
522    fn remove_deleted_published(&mut self, path: &Path) {
523        self.remove_advertisement(&path);
524        if let Some(p) = self.by_path.remove(path) {
525            self.by_id.remove(&p.val.id());
526        }
527    }
528
529    fn remove_advertisement(&self, path: &Path) {
530        if let Some((_, def)) = self.get_root(&path) {
531            def.remove_advertisement(&path);
532        }
533    }
534
535    fn advertise_path(&self, path: Path) {
536        if !self.params.sparse {
537            if let Some((_, def)) = self.get_root(&path) {
538                let _: Result<_> = def.advertise(path);
539            }
540        }
541    }
542
543    fn process_update(&mut self, batch: &mut UpdateBatch, mut update: db::Update) {
544        use db::UpdateKind;
545        let mut locked = false;
546        let mut roots = false;
547        for path in update.added_roots.drain(..) {
548            roots = true;
549            match self.publisher.publish_default(path.clone()) {
550                Err(e) => error!("failed to publish_default {path} {e:?}"),
551                Ok(dh) => {
552                    self.roots.insert(path, dh);
553                }
554            }
555        }
556        for path in update.removed_roots.drain(..) {
557            roots = true;
558            self.roots.remove(&path);
559        }
560        for (path, value) in update.data.drain(..) {
561            match value {
562                UpdateKind::Updated(v) => {
563                    if let Some(p) = self.by_path.get(&path) {
564                        p.val.update(batch, v)
565                    }
566                }
567                UpdateKind::Inserted(v) => match self.by_path.get(&path) {
568                    Some(p) => p.val.update(batch, v),
569                    None => self.advertise_path(path),
570                },
571                UpdateKind::Deleted => self.remove_deleted_published(&path),
572            }
573        }
574        for path in update.locked.drain(..) {
575            locked = true;
576            if self.is_locked_gen(&path, true) {
577                self.locked.remove(&path);
578            } else {
579                self.locked.insert(path, true);
580            }
581        }
582        for path in update.unlocked.drain(..) {
583            locked = true;
584            if !self.is_locked_gen(&path, true) {
585                self.locked.remove(&path);
586            } else {
587                self.locked.insert(path, false);
588            }
589        }
590        if locked {
591            // CR estokes: log this
592            if let Some(stats) = &mut self.stats {
593                let _: Result<_> = stats.set_locked(batch, &self.locked);
594            }
595        }
596        if roots {
597            // CR estokes: log this
598            if let Some(stats) = &mut self.stats {
599                let _: Result<_> = stats.set_roots(batch, &self.roots);
600            }
601        }
602    }
603
604    fn process_command(&mut self, txn: &mut Txn, c: ToInner) {
605        match c {
606            ToInner::GetDb(s) => {
607                let _ = s.send(self.db.clone());
608            }
609            ToInner::GetPub(s) => {
610                let _ = s.send(self.publisher.clone());
611            }
612            ToInner::Commit(t, s) => {
613                *txn = t;
614                let _ = s.send(());
615            }
616        }
617    }
618
619    async fn run(mut self, mut cmd: mpsc::UnboundedReceiver<ToInner>) -> Result<()> {
620        let mut rpcbatch = Vec::new();
621        let mut batch = self.publisher.start_batch();
622        let mut txn = Txn::new();
623        async fn api_rx(api: &mut Option<RpcApi>) -> BatchItem<RpcRequest> {
624            match api {
625                Some(api) => api.rx.select_next_some().await,
626                None => future::pending().await,
627            }
628        }
629        loop {
630            select_biased! {
631                r = self.publish_events.select_next_some() => {
632                    self.process_publish_event(r);
633                },
634                r = self.roots.select_next_some() => {
635                    self.process_publish_request(r.0, r.1)
636                },
637                r = api_rx(&mut self.api).fuse() => match r {
638                    BatchItem::InBatch(v) => rpcbatch.push(v),
639                    BatchItem::EndBatch => self.process_rpc_requests(&mut txn, &mut rpcbatch)
640                },
641                u = self.db_updates.select_next_some() => {
642                    self.process_update(&mut batch, u);
643                },
644                c = cmd.select_next_some() => {
645                    self.process_command(&mut txn, c);
646                },
647                w = self.write_updates_rx.select_next_some() => {
648                    self.process_writes(&mut txn, w);
649                }
650                complete => break,
651            }
652            if txn.dirty() {
653                self.db.commit(mem::replace(&mut txn, Txn::new()));
654            }
655            if batch.len() > 0 {
656                let timeout = self.params.timeout.map(Duration::from_secs);
657                let new_batch = self.publisher.start_batch();
658                mem::replace(&mut batch, new_batch).commit(timeout).await;
659            }
660        }
661        self.publisher.clone().shutdown().await;
662        self.db.flush_async().await?;
663        Ok(())
664    }
665}
666
667enum ToInner {
668    GetDb(oneshot::Sender<Db>),
669    GetPub(oneshot::Sender<Publisher>),
670    Commit(Txn, oneshot::Sender<()>),
671}
672
673#[derive(Clone)]
674pub struct Container(mpsc::UnboundedSender<ToInner>);
675
676impl Container {
677    /// Start the container with the specified
678    /// parameters. Initialization is performed on the calling task,
679    /// and an error is returned if it fails. Once initialization is
680    /// complete, the container continues to run on a background task,
681    /// and errors are reported by log::error!.
682    pub async fn start(
683        cfg: Config,
684        auth: DesiredAuth,
685        params: Params,
686    ) -> Result<Container> {
687        let (w, r) = mpsc::unbounded();
688        let mut c = ContainerInner::new(cfg, auth, params).await?;
689        c.init().await?;
690        task::spawn(async move {
691            match c.run(r).await {
692                Err(e) => error!("container stopped with error {}", e),
693                Ok(()) => info!("container stopped gracefully"),
694            }
695        });
696        Ok(Container(w))
697    }
698
699    /// Fetch the database associated with the container. You can
700    /// perform any read operations you like on the database, however
701    /// keep in mind that clients can write while you are
702    /// reading. Transactions must be submitted to the main task to
703    /// ensure some level of coordination between client writes and
704    /// application transactions.
705    pub async fn db(&self) -> Result<Db> {
706        let (w, r) = oneshot::channel();
707        self.0.unbounded_send(ToInner::GetDb(w))?;
708        Ok(r.await?)
709    }
710
711    /// Submit a database transaction to be processed, wait for it to
712    /// be accepted.
713    pub async fn commit(&self, txn: Txn) -> Result<()> {
714        let (w, r) = oneshot::channel();
715        self.0.unbounded_send(ToInner::Commit(txn, w))?;
716        Ok(r.await?)
717    }
718
719    /// Submit a database transaction without waiting
720    pub fn commit_unbounded(&self, txn: Txn) -> Result<()> {
721        let (w, _r) = oneshot::channel();
722        Ok(self.0.unbounded_send(ToInner::Commit(txn, w))?)
723    }
724
725    /// Retreive the container's publisher.
726    pub async fn publisher(&self) -> Result<Publisher> {
727        let (w, r) = oneshot::channel();
728        self.0.unbounded_send(ToInner::GetPub(w))?;
729        Ok(r.await?)
730    }
731}