Skip to main content

netidx_container/
lib.rs

1#[macro_use]
2extern crate netidx_protocols;
3
4mod db;
5mod rpcs;
6mod stats;
7
8use crate::rpcs::RpcApi;
9use ahash::AHashMap;
10use anyhow::{bail, Result};
11use arcstr::{literal, ArcStr};
12pub use db::{Datum, DatumKind, Db, Reply, Sendable, Txn};
13use derive_builder::Builder;
14use futures::{
15    self,
16    channel::{mpsc, oneshot},
17    future,
18    prelude::*,
19    select_biased,
20    stream::FusedStream,
21};
22use log::{error, info};
23use netidx::{
24    config::Config,
25    path::Path,
26    publisher::{
27        BindCfg, DefaultHandle, Event as PEvent, Id, PublishFlags, Publisher,
28        PublisherBuilder, UpdateBatch, Val, Value, WriteRequest,
29    },
30    resolver_client::DesiredAuth,
31    utils::BatchItem,
32};
33use nohash::IntMap;
34use parking_lot::Mutex;
35use poolshark::global::GPooled;
36use rpcs::{RpcRequest, RpcRequestKind};
37use stats::Stats;
38use std::{
39    collections::{
40        BTreeMap,
41        Bound::{self, *},
42    },
43    mem,
44    ops::{Deref, DerefMut},
45    path::PathBuf,
46    pin::Pin,
47    time::Duration,
48};
49use structopt::StructOpt;
50use tokio::task;
51use triomphe::Arc;
52
53#[derive(StructOpt, Builder, Debug)]
54pub struct Params {
55    #[structopt(
56        short = "b",
57        long = "bind",
58        help = "configure the bind address e.g. local, 192.168.0.0/16, 127.0.0.1:5000"
59    )]
60    #[builder(setter(strip_option), default)]
61    pub bind: Option<BindCfg>,
62    #[structopt(
63        long = "timeout",
64        help = "require subscribers to consume values before timeout (seconds)"
65    )]
66    #[builder(setter(strip_option), default)]
67    pub timeout: Option<u64>,
68    #[structopt(long = "slack", help = "set the publisher slack (default 3 batches)")]
69    #[builder(setter(strip_option), default)]
70    pub slack: Option<usize>,
71    #[structopt(
72        long = "max_clients",
73        help = "set the maximum number of clients (default 768)"
74    )]
75    #[builder(setter(strip_option), default)]
76    pub max_clients: Option<usize>,
77    #[structopt(long = "api-path", help = "the netidx path of the container api")]
78    #[builder(setter(strip_option), default)]
79    pub api_path: Option<Path>,
80    #[structopt(long = "db", help = "the db file")]
81    #[builder(setter(strip_option), default)]
82    pub db: Option<String>,
83    #[structopt(long = "cache-size", help = "db page cache size in bytes")]
84    #[builder(setter(strip_option), default)]
85    pub cache_size: Option<u64>,
86    #[structopt(long = "sparse", help = "don't even advertise the contents of the db")]
87    #[builder(default = "false")]
88    pub sparse: bool,
89}
90
91impl Params {
92    pub fn default_db_path() -> Option<PathBuf> {
93        dirs::data_dir().map(|mut p| {
94            p.push("netidx");
95            p.push("container");
96            p.push("db");
97            p
98        })
99    }
100}
101
102macro_rules! or_reply {
103    ($reply:expr, $r:expr) => {
104        match $r {
105            Ok(r) => r,
106            Err(e) => {
107                if let Some(reply) = $reply {
108                    let e = Value::error(format!("{}", e));
109                    reply.send(e);
110                }
111                return;
112            }
113        }
114    };
115}
116
117struct Published {
118    path: Path,
119    val: Val,
120}
121
122#[must_use = "streams do nothing unless polled"]
123struct Roots(BTreeMap<Path, DefaultHandle>);
124
125impl Deref for Roots {
126    type Target = BTreeMap<Path, DefaultHandle>;
127
128    fn deref(&self) -> &Self::Target {
129        &self.0
130    }
131}
132
133impl DerefMut for Roots {
134    fn deref_mut(&mut self) -> &mut Self::Target {
135        &mut self.0
136    }
137}
138
139impl<'a> Stream for Roots {
140    type Item = (Path, oneshot::Sender<()>);
141
142    fn poll_next(
143        mut self: Pin<&mut Self>,
144        cx: &mut std::task::Context<'_>,
145    ) -> std::task::Poll<Option<Self::Item>> {
146        use std::task::Poll;
147        for dh in self.values_mut() {
148            match Pin::new(&mut *dh).poll_next(cx) {
149                Poll::Pending => (),
150                r @ Poll::Ready(_) => return r,
151            }
152        }
153        Poll::Pending
154    }
155}
156
157impl<'a> FusedStream for Roots {
158    fn is_terminated(&self) -> bool {
159        self.values().all(|ch| ch.is_terminated())
160    }
161}
162
163struct ContainerInner {
164    params: Params,
165    db: Db,
166    publisher: Publisher,
167    api_path: Option<Path>,
168    stats: Option<Stats>,
169    locked: BTreeMap<Path, bool>,
170    write_updates_tx: mpsc::Sender<GPooled<Vec<WriteRequest>>>,
171    write_updates_rx: mpsc::Receiver<GPooled<Vec<WriteRequest>>>,
172    by_id: IntMap<Id, Arc<Published>>,
173    by_path: AHashMap<Path, Arc<Published>>,
174    publish_events: mpsc::UnboundedReceiver<PEvent>,
175    roots: Roots,
176    db_updates: mpsc::UnboundedReceiver<db::Update>,
177    api: Option<rpcs::RpcApi>,
178}
179
180impl ContainerInner {
181    async fn new(cfg: Config, auth: DesiredAuth, params: Params) -> Result<Self> {
182        let (publish_events_tx, publish_events) = mpsc::unbounded();
183        let mut publisher = PublisherBuilder::new(cfg.clone());
184        publisher.desired_auth(auth.clone()).bind_cfg(params.bind);
185        if let Some(n) = params.slack {
186            publisher.slack(n);
187        }
188        if let Some(n) = params.max_clients {
189            publisher.max_clients(n);
190        }
191        let publisher = publisher.build().await?;
192        publisher.events(publish_events_tx);
193        let (db, db_updates) =
194            Db::new(&params, publisher.clone(), params.api_path.clone())?;
195        let (write_updates_tx, write_updates_rx) = mpsc::channel(3);
196        let (api_path, api) = match params.api_path.as_ref() {
197            None => (None, None),
198            Some(api_path) => {
199                let api_path = api_path.append("rpcs");
200                let api = rpcs::RpcApi::new(&publisher, &api_path)?;
201                (Some(api_path), Some(api))
202            }
203        };
204        let stats = match params.api_path.as_ref() {
205            None => None,
206            Some(p) => Some(Stats::new(publisher.clone(), p.clone())),
207        };
208        Ok(Self {
209            params,
210            api_path,
211            stats,
212            locked: BTreeMap::new(),
213            roots: Roots(BTreeMap::new()),
214            db,
215            publisher,
216            write_updates_tx,
217            write_updates_rx,
218            by_id: IntMap::default(),
219            by_path: AHashMap::default(),
220            publish_events,
221            db_updates,
222            api,
223        })
224    }
225
226    fn get_root(&self, path: &Path) -> Option<(&Path, &DefaultHandle)> {
227        match self
228            .roots
229            .range::<str, (Bound<&str>, Bound<&str>)>((
230                Unbounded,
231                Included(path.as_ref()),
232            ))
233            .next_back()
234        {
235            None => None,
236            Some((prev, def)) => {
237                if path.starts_with(prev.as_ref()) {
238                    Some((prev, def))
239                } else {
240                    None
241                }
242            }
243        }
244    }
245
246    fn check_path(&self, path: Path) -> Result<Path> {
247        if self.get_root(&path).is_some() {
248            Ok(path)
249        } else {
250            bail!("non root path")
251        }
252    }
253
254    fn publish_data(&mut self, path: Path, value: Value) -> Result<()> {
255        self.advertise_path(path.clone());
256        let val = self.publisher.publish_with_flags(
257            PublishFlags::DESTROY_ON_IDLE,
258            path.clone(),
259            value.clone(),
260        )?;
261        let id = val.id();
262        self.publisher.writes(val.id(), self.write_updates_tx.clone());
263        let published = Arc::new(Published { path: path.clone(), val });
264        self.by_id.insert(id, published.clone());
265        self.by_path.insert(path.clone(), published);
266        Ok(())
267    }
268
269    async fn init(&mut self) -> Result<()> {
270        let mut batch = self.publisher.start_batch();
271        for res in self.db.roots() {
272            let path = res?;
273            let def = self.publisher.publish_default(path.clone())?;
274            self.roots.insert(path, def);
275        }
276        if let Some(stats) = &mut self.stats {
277            let _ = stats.set_roots(&mut batch, &self.roots);
278        }
279        for res in self.db.locked() {
280            let (path, locked) = res?;
281            let path = self.check_path(path)?;
282            self.locked.insert(path, locked);
283        }
284        if let Some(stats) = &mut self.stats {
285            let _ = stats.set_locked(&mut batch, &self.locked);
286        }
287        for res in self.db.iter() {
288            let (path, kind, _) = res?;
289            match kind {
290                DatumKind::Data | DatumKind::Formula => {
291                    let path = self.check_path(path)?;
292                    self.advertise_path(path);
293                }
294                DatumKind::Deleted | DatumKind::Invalid => (),
295            }
296        }
297        Ok(batch.commit(self.params.timeout.map(Duration::from_secs)).await)
298    }
299
300    fn process_writes(&mut self, txn: &mut Txn, mut writes: GPooled<Vec<WriteRequest>>) {
301        // CR estokes: log this
302        for req in writes.drain(..) {
303            let reply = req.send_result.map(Sendable::Write);
304            if let Some(p) = self.by_id.get(&req.id) {
305                txn.set_data(true, p.path.clone(), req.value, reply);
306            }
307        }
308    }
309
310    fn is_locked_gen(&self, path: &Path, parent_only: bool) -> bool {
311        let mut iter = self.locked.range::<str, (Bound<&str>, Bound<&str>)>((
312            Bound::Unbounded,
313            if parent_only {
314                Bound::Excluded(path.as_ref())
315            } else {
316                Bound::Included(path.as_ref())
317            },
318        ));
319        loop {
320            match iter.next_back() {
321                None => break false,
322                Some((p, locked)) if Path::is_parent(p, &path) => break *locked,
323                Some(_) => (),
324            }
325        }
326    }
327
328    fn is_locked(&self, path: &Path) -> bool {
329        self.is_locked_gen(path, false)
330    }
331
332    fn process_publish_request(&mut self, path: Path, reply: oneshot::Sender<()>) {
333        match self.check_path(path) {
334            Err(_) => {
335                // this should not be possible, but in case of a bug, just do nothing
336                let _: Result<_, _> = reply.send(());
337            }
338            Ok(path) => {
339                match self.db.lookup(path.as_ref()) {
340                    Ok(Some(Datum::Data(v) | Datum::Formula(v, _))) => {
341                        let _: Result<()> = self.publish_data(path, v);
342                    }
343                    Err(_) | Ok(Some(Datum::Deleted)) | Ok(None) => {
344                        let locked = self.is_locked(&path);
345                        let api = self
346                            .api_path
347                            .as_ref()
348                            .map(|p| Path::is_parent(p, &path))
349                            .unwrap_or(false);
350                        if !locked && !api {
351                            let _: Result<()> = self.publish_data(path, Value::Null);
352                        };
353                    }
354                }
355                let _: Result<_, _> = reply.send(());
356            }
357        }
358    }
359
360    fn process_publish_event(&mut self, e: PEvent) {
361        match e {
362            PEvent::Subscribe(_, _) | PEvent::Unsubscribe(_, _) => (),
363            PEvent::Destroyed(id) => {
364                if let Some(p) = self.by_id.remove(&id) {
365                    self.by_path.remove(&p.path);
366                }
367            }
368        }
369    }
370
371    fn delete_path(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
372        let path = or_reply!(reply, self.check_path(path));
373        let bn = Path::basename(&path);
374        if bn == Some(".formula") || bn == Some(".on-write") {
375            if let Some(path) = Path::dirname(&path) {
376                txn.remove(Path::from(ArcStr::from(path)), reply);
377            }
378        } else {
379            txn.remove(path, reply);
380        }
381    }
382
383    fn delete_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
384        let path = or_reply!(reply, self.check_path(path));
385        txn.remove_subtree(path, reply);
386    }
387
388    fn lock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
389        let path = or_reply!(reply, self.check_path(path));
390        txn.set_locked(path, reply);
391    }
392
393    fn unlock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
394        let path = or_reply!(reply, self.check_path(path));
395        txn.set_unlocked(path, reply);
396    }
397
398    fn set_data(&mut self, txn: &mut Txn, path: Path, value: Value, reply: Reply) {
399        let path = or_reply!(reply, self.check_path(path));
400        txn.set_data(true, path, value, reply);
401    }
402
403    fn create_sheet(
404        &self,
405        txn: &mut Txn,
406        path: Path,
407        rows: usize,
408        columns: usize,
409        max_rows: usize,
410        max_columns: usize,
411        lock: bool,
412        reply: Reply,
413    ) {
414        let path = or_reply!(reply, self.check_path(path));
415        if rows > max_rows || columns > max_columns {
416            let m = literal!("rows <= max_rows && columns <= max_columns");
417            if let Some(reply) = reply {
418                reply.send(Value::error(m));
419            }
420        } else {
421            txn.create_sheet(path, rows, columns, max_rows, max_columns, lock, reply);
422        }
423    }
424
425    fn create_table(
426        &self,
427        txn: &mut Txn,
428        path: Path,
429        rows: Vec<ArcStr>,
430        columns: Vec<ArcStr>,
431        lock: bool,
432        reply: Reply,
433    ) {
434        let path = or_reply!(reply, self.check_path(path));
435        txn.create_table(path, rows, columns, lock, reply);
436    }
437
438    fn process_rpc_requests(&mut self, txn: &mut Txn, reqs: &mut Vec<RpcRequest>) {
439        let mut process_non_packed = |reply: Sendable, req: RpcRequestKind| match req {
440            RpcRequestKind::Delete(path) => self.delete_path(txn, path, Some(reply)),
441            RpcRequestKind::DeleteSubtree(path) => {
442                self.delete_subtree(txn, path, Some(reply))
443            }
444            RpcRequestKind::LockSubtree(path) => {
445                self.lock_subtree(txn, path, Some(reply))
446            }
447            RpcRequestKind::UnlockSubtree(path) => {
448                self.unlock_subtree(txn, path, Some(reply))
449            }
450            RpcRequestKind::SetData { path, value } => {
451                self.set_data(txn, path, value, Some(reply))
452            }
453            RpcRequestKind::CreateSheet {
454                path,
455                rows,
456                columns,
457                max_rows,
458                max_columns,
459                lock,
460            } => self.create_sheet(
461                txn,
462                path,
463                rows,
464                columns,
465                max_rows,
466                max_columns,
467                lock,
468                Some(reply),
469            ),
470            RpcRequestKind::AddSheetRows(path, rows) => {
471                txn.add_sheet_rows(path, rows, Some(reply));
472            }
473            RpcRequestKind::AddSheetCols(path, cols) => {
474                txn.add_sheet_columns(path, cols, Some(reply));
475            }
476            RpcRequestKind::DelSheetRows(path, rows) => {
477                txn.del_sheet_rows(path, rows, Some(reply));
478            }
479            RpcRequestKind::DelSheetCols(path, cols) => {
480                txn.del_sheet_columns(path, cols, Some(reply));
481            }
482            RpcRequestKind::CreateTable { path, rows, columns, lock } => {
483                self.create_table(txn, path, rows, columns, lock, Some(reply))
484            }
485            RpcRequestKind::AddTableRows(path, rows) => {
486                txn.add_table_rows(path, rows, Some(reply));
487            }
488            RpcRequestKind::AddTableCols(path, cols) => {
489                txn.add_table_columns(path, cols, Some(reply));
490            }
491            RpcRequestKind::DelTableRows(path, rows) => {
492                txn.del_table_rows(path, rows, Some(reply));
493            }
494            RpcRequestKind::DelTableCols(path, cols) => {
495                txn.del_table_columns(path, cols, Some(reply));
496            }
497            RpcRequestKind::AddRoot(path) => {
498                txn.add_root(path, Some(reply));
499            }
500            RpcRequestKind::DelRoot(path) => {
501                txn.del_root(path, Some(reply));
502            }
503            RpcRequestKind::Packed(_) => unreachable!(),
504        };
505        for mut req in reqs.drain(..) {
506            match req.kind {
507                RpcRequestKind::Packed(reqs) => {
508                    let res = Arc::new(Mutex::new(Value::Null));
509                    for req in reqs {
510                        let reply = Sendable::Packed(res.clone());
511                        process_non_packed(reply, req)
512                    }
513                    req.reply.send(mem::replace(&mut *res.lock(), Value::Null));
514                }
515                k => {
516                    let reply = Sendable::Rpc(req.reply);
517                    process_non_packed(reply, k)
518                }
519            }
520        }
521    }
522
523    fn remove_deleted_published(&mut self, path: &Path) {
524        self.remove_advertisement(&path);
525        if let Some(p) = self.by_path.remove(path) {
526            self.by_id.remove(&p.val.id());
527        }
528    }
529
530    fn remove_advertisement(&self, path: &Path) {
531        if let Some((_, def)) = self.get_root(&path) {
532            def.remove_advertisement(&path);
533        }
534    }
535
536    fn advertise_path(&self, path: Path) {
537        if !self.params.sparse {
538            if let Some((_, def)) = self.get_root(&path) {
539                let _: Result<_> = def.advertise(path);
540            }
541        }
542    }
543
544    fn process_update(&mut self, batch: &mut UpdateBatch, mut update: db::Update) {
545        use db::UpdateKind;
546        let mut locked = false;
547        let mut roots = false;
548        for path in update.added_roots.drain(..) {
549            roots = true;
550            match self.publisher.publish_default(path.clone()) {
551                Err(e) => error!("failed to publish_default {path} {e:?}"),
552                Ok(dh) => {
553                    self.roots.insert(path, dh);
554                }
555            }
556        }
557        for path in update.removed_roots.drain(..) {
558            roots = true;
559            self.roots.remove(&path);
560        }
561        for (path, value) in update.data.drain(..) {
562            match value {
563                UpdateKind::Updated(v) => {
564                    if let Some(p) = self.by_path.get(&path) {
565                        p.val.update(batch, v)
566                    }
567                }
568                UpdateKind::Inserted(v) => match self.by_path.get(&path) {
569                    Some(p) => p.val.update(batch, v),
570                    None => self.advertise_path(path),
571                },
572                UpdateKind::Deleted => self.remove_deleted_published(&path),
573            }
574        }
575        for path in update.locked.drain(..) {
576            locked = true;
577            if self.is_locked_gen(&path, true) {
578                self.locked.remove(&path);
579            } else {
580                self.locked.insert(path, true);
581            }
582        }
583        for path in update.unlocked.drain(..) {
584            locked = true;
585            if !self.is_locked_gen(&path, true) {
586                self.locked.remove(&path);
587            } else {
588                self.locked.insert(path, false);
589            }
590        }
591        if locked {
592            // CR estokes: log this
593            if let Some(stats) = &mut self.stats {
594                let _: Result<_> = stats.set_locked(batch, &self.locked);
595            }
596        }
597        if roots {
598            // CR estokes: log this
599            if let Some(stats) = &mut self.stats {
600                let _: Result<_> = stats.set_roots(batch, &self.roots);
601            }
602        }
603    }
604
605    fn process_command(&mut self, txn: &mut Txn, c: ToInner) {
606        match c {
607            ToInner::GetDb(s) => {
608                let _ = s.send(self.db.clone());
609            }
610            ToInner::GetPub(s) => {
611                let _ = s.send(self.publisher.clone());
612            }
613            ToInner::Commit(t, s) => {
614                *txn = t;
615                let _ = s.send(());
616            }
617        }
618    }
619
620    async fn run(mut self, mut cmd: mpsc::UnboundedReceiver<ToInner>) -> Result<()> {
621        let mut rpcbatch = Vec::new();
622        let mut batch = self.publisher.start_batch();
623        let mut txn = Txn::new();
624        async fn api_rx(api: &mut Option<RpcApi>) -> BatchItem<RpcRequest> {
625            match api {
626                Some(api) => api.rx.select_next_some().await,
627                None => future::pending().await,
628            }
629        }
630        loop {
631            select_biased! {
632                r = self.publish_events.select_next_some() => {
633                    self.process_publish_event(r);
634                },
635                r = self.roots.select_next_some() => {
636                    self.process_publish_request(r.0, r.1)
637                },
638                r = api_rx(&mut self.api).fuse() => match r {
639                    BatchItem::InBatch(v) => rpcbatch.push(v),
640                    BatchItem::EndBatch => self.process_rpc_requests(&mut txn, &mut rpcbatch)
641                },
642                u = self.db_updates.select_next_some() => {
643                    self.process_update(&mut batch, u);
644                },
645                c = cmd.select_next_some() => {
646                    self.process_command(&mut txn, c);
647                },
648                w = self.write_updates_rx.select_next_some() => {
649                    self.process_writes(&mut txn, w);
650                }
651                complete => break,
652            }
653            if txn.dirty() {
654                self.db.commit(mem::replace(&mut txn, Txn::new()));
655            }
656            if batch.len() > 0 {
657                let timeout = self.params.timeout.map(Duration::from_secs);
658                let new_batch = self.publisher.start_batch();
659                mem::replace(&mut batch, new_batch).commit(timeout).await;
660            }
661        }
662        self.publisher.clone().shutdown().await;
663        self.db.flush_async().await?;
664        Ok(())
665    }
666}
667
668enum ToInner {
669    GetDb(oneshot::Sender<Db>),
670    GetPub(oneshot::Sender<Publisher>),
671    Commit(Txn, oneshot::Sender<()>),
672}
673
674#[derive(Clone)]
675pub struct Container(mpsc::UnboundedSender<ToInner>);
676
677impl Container {
678    /// Start the container with the specified
679    /// parameters. Initialization is performed on the calling task,
680    /// and an error is returned if it fails. Once initialization is
681    /// complete, the container continues to run on a background task,
682    /// and errors are reported by log::error!.
683    pub async fn start(
684        cfg: Config,
685        auth: DesiredAuth,
686        params: Params,
687    ) -> Result<Container> {
688        let (w, r) = mpsc::unbounded();
689        let mut c = ContainerInner::new(cfg, auth, params).await?;
690        c.init().await?;
691        task::spawn(async move {
692            match c.run(r).await {
693                Err(e) => error!("container stopped with error {}", e),
694                Ok(()) => info!("container stopped gracefully"),
695            }
696        });
697        Ok(Container(w))
698    }
699
700    /// Fetch the database associated with the container. You can
701    /// perform any read operations you like on the database, however
702    /// keep in mind that clients can write while you are
703    /// reading. Transactions must be submitted to the main task to
704    /// ensure some level of coordination between client writes and
705    /// application transactions.
706    pub async fn db(&self) -> Result<Db> {
707        let (w, r) = oneshot::channel();
708        self.0.unbounded_send(ToInner::GetDb(w))?;
709        Ok(r.await?)
710    }
711
712    /// Submit a database transaction to be processed, wait for it to
713    /// be accepted.
714    pub async fn commit(&self, txn: Txn) -> Result<()> {
715        let (w, r) = oneshot::channel();
716        self.0.unbounded_send(ToInner::Commit(txn, w))?;
717        Ok(r.await?)
718    }
719
720    /// Submit a database transaction without waiting
721    pub fn commit_unbounded(&self, txn: Txn) -> Result<()> {
722        let (w, _r) = oneshot::channel();
723        Ok(self.0.unbounded_send(ToInner::Commit(txn, w))?)
724    }
725
726    /// Retreive the container's publisher.
727    pub async fn publisher(&self) -> Result<Publisher> {
728        let (w, r) = oneshot::channel();
729        self.0.unbounded_send(ToInner::GetPub(w))?;
730        Ok(r.await?)
731    }
732}