1#[macro_use]
2extern crate netidx_protocols;
3
4mod db;
5mod rpcs;
6mod stats;
7
8use crate::rpcs::RpcApi;
9use ahash::AHashMap;
10use anyhow::{bail, Result};
11use arcstr::{literal, ArcStr};
12pub use db::{Datum, DatumKind, Db, Reply, Sendable, Txn};
13use derive_builder::Builder;
14use futures::{
15 self,
16 channel::{mpsc, oneshot},
17 future,
18 prelude::*,
19 select_biased,
20 stream::FusedStream,
21};
22use log::{error, info};
23use netidx::{
24 config::Config,
25 path::Path,
26 publisher::{
27 BindCfg, DefaultHandle, Event as PEvent, Id, PublishFlags, Publisher,
28 PublisherBuilder, UpdateBatch, Val, Value, WriteRequest,
29 },
30 resolver_client::DesiredAuth,
31 utils::BatchItem,
32};
33use nohash::IntMap;
34use parking_lot::Mutex;
35use poolshark::global::GPooled;
36use rpcs::{RpcRequest, RpcRequestKind};
37use stats::Stats;
38use std::{
39 collections::{
40 BTreeMap,
41 Bound::{self, *},
42 },
43 mem,
44 ops::{Deref, DerefMut},
45 path::PathBuf,
46 pin::Pin,
47 time::Duration,
48};
49use structopt::StructOpt;
50use tokio::task;
51use triomphe::Arc;
52
53#[derive(StructOpt, Builder, Debug)]
54pub struct Params {
55 #[structopt(
56 short = "b",
57 long = "bind",
58 help = "configure the bind address e.g. local, 192.168.0.0/16, 127.0.0.1:5000"
59 )]
60 #[builder(setter(strip_option), default)]
61 pub bind: Option<BindCfg>,
62 #[structopt(
63 long = "timeout",
64 help = "require subscribers to consume values before timeout (seconds)"
65 )]
66 #[builder(setter(strip_option), default)]
67 pub timeout: Option<u64>,
68 #[structopt(long = "slack", help = "set the publisher slack (default 3 batches)")]
69 #[builder(setter(strip_option), default)]
70 pub slack: Option<usize>,
71 #[structopt(
72 long = "max_clients",
73 help = "set the maximum number of clients (default 768)"
74 )]
75 #[builder(setter(strip_option), default)]
76 pub max_clients: Option<usize>,
77 #[structopt(long = "api-path", help = "the netidx path of the container api")]
78 #[builder(setter(strip_option), default)]
79 pub api_path: Option<Path>,
80 #[structopt(long = "db", help = "the db file")]
81 #[builder(setter(strip_option), default)]
82 pub db: Option<String>,
83 #[structopt(long = "cache-size", help = "db page cache size in bytes")]
84 #[builder(setter(strip_option), default)]
85 pub cache_size: Option<u64>,
86 #[structopt(long = "sparse", help = "don't even advertise the contents of the db")]
87 #[builder(default = "false")]
88 pub sparse: bool,
89}
90
91impl Params {
92 pub fn default_db_path() -> Option<PathBuf> {
93 dirs::data_dir().map(|mut p| {
94 p.push("netidx");
95 p.push("container");
96 p.push("db");
97 p
98 })
99 }
100}
101
102macro_rules! or_reply {
103 ($reply:expr, $r:expr) => {
104 match $r {
105 Ok(r) => r,
106 Err(e) => {
107 if let Some(reply) = $reply {
108 let e = Value::error(format!("{}", e));
109 reply.send(e);
110 }
111 return;
112 }
113 }
114 };
115}
116
117struct Published {
118 path: Path,
119 val: Val,
120}
121
122#[must_use = "streams do nothing unless polled"]
123struct Roots(BTreeMap<Path, DefaultHandle>);
124
125impl Deref for Roots {
126 type Target = BTreeMap<Path, DefaultHandle>;
127
128 fn deref(&self) -> &Self::Target {
129 &self.0
130 }
131}
132
133impl DerefMut for Roots {
134 fn deref_mut(&mut self) -> &mut Self::Target {
135 &mut self.0
136 }
137}
138
139impl<'a> Stream for Roots {
140 type Item = (Path, oneshot::Sender<()>);
141
142 fn poll_next(
143 mut self: Pin<&mut Self>,
144 cx: &mut std::task::Context<'_>,
145 ) -> std::task::Poll<Option<Self::Item>> {
146 use std::task::Poll;
147 for dh in self.values_mut() {
148 match Pin::new(&mut *dh).poll_next(cx) {
149 Poll::Pending => (),
150 r @ Poll::Ready(_) => return r,
151 }
152 }
153 Poll::Pending
154 }
155}
156
157impl<'a> FusedStream for Roots {
158 fn is_terminated(&self) -> bool {
159 self.values().all(|ch| ch.is_terminated())
160 }
161}
162
163struct ContainerInner {
164 params: Params,
165 db: Db,
166 publisher: Publisher,
167 api_path: Option<Path>,
168 stats: Option<Stats>,
169 locked: BTreeMap<Path, bool>,
170 write_updates_tx: mpsc::Sender<GPooled<Vec<WriteRequest>>>,
171 write_updates_rx: mpsc::Receiver<GPooled<Vec<WriteRequest>>>,
172 by_id: IntMap<Id, Arc<Published>>,
173 by_path: AHashMap<Path, Arc<Published>>,
174 publish_events: mpsc::UnboundedReceiver<PEvent>,
175 roots: Roots,
176 db_updates: mpsc::UnboundedReceiver<db::Update>,
177 api: Option<rpcs::RpcApi>,
178}
179
180impl ContainerInner {
181 async fn new(cfg: Config, auth: DesiredAuth, params: Params) -> Result<Self> {
182 let (publish_events_tx, publish_events) = mpsc::unbounded();
183 let mut publisher = PublisherBuilder::new(cfg.clone());
184 publisher.desired_auth(auth.clone()).bind_cfg(params.bind);
185 if let Some(n) = params.slack {
186 publisher.slack(n);
187 }
188 if let Some(n) = params.max_clients {
189 publisher.max_clients(n);
190 }
191 let publisher = publisher.build().await?;
192 publisher.events(publish_events_tx);
193 let (db, db_updates) =
194 Db::new(¶ms, publisher.clone(), params.api_path.clone())?;
195 let (write_updates_tx, write_updates_rx) = mpsc::channel(3);
196 let (api_path, api) = match params.api_path.as_ref() {
197 None => (None, None),
198 Some(api_path) => {
199 let api_path = api_path.append("rpcs");
200 let api = rpcs::RpcApi::new(&publisher, &api_path)?;
201 (Some(api_path), Some(api))
202 }
203 };
204 let stats = match params.api_path.as_ref() {
205 None => None,
206 Some(p) => Some(Stats::new(publisher.clone(), p.clone())),
207 };
208 Ok(Self {
209 params,
210 api_path,
211 stats,
212 locked: BTreeMap::new(),
213 roots: Roots(BTreeMap::new()),
214 db,
215 publisher,
216 write_updates_tx,
217 write_updates_rx,
218 by_id: IntMap::default(),
219 by_path: AHashMap::default(),
220 publish_events,
221 db_updates,
222 api,
223 })
224 }
225
226 fn get_root(&self, path: &Path) -> Option<(&Path, &DefaultHandle)> {
227 match self
228 .roots
229 .range::<str, (Bound<&str>, Bound<&str>)>((
230 Unbounded,
231 Included(path.as_ref()),
232 ))
233 .next_back()
234 {
235 None => None,
236 Some((prev, def)) => {
237 if path.starts_with(prev.as_ref()) {
238 Some((prev, def))
239 } else {
240 None
241 }
242 }
243 }
244 }
245
246 fn check_path(&self, path: Path) -> Result<Path> {
247 if self.get_root(&path).is_some() {
248 Ok(path)
249 } else {
250 bail!("non root path")
251 }
252 }
253
254 fn publish_data(&mut self, path: Path, value: Value) -> Result<()> {
255 self.advertise_path(path.clone());
256 let val = self.publisher.publish_with_flags(
257 PublishFlags::DESTROY_ON_IDLE,
258 path.clone(),
259 value.clone(),
260 )?;
261 let id = val.id();
262 self.publisher.writes(val.id(), self.write_updates_tx.clone());
263 let published = Arc::new(Published { path: path.clone(), val });
264 self.by_id.insert(id, published.clone());
265 self.by_path.insert(path.clone(), published);
266 Ok(())
267 }
268
269 async fn init(&mut self) -> Result<()> {
270 let mut batch = self.publisher.start_batch();
271 for res in self.db.roots() {
272 let path = res?;
273 let def = self.publisher.publish_default(path.clone())?;
274 self.roots.insert(path, def);
275 }
276 if let Some(stats) = &mut self.stats {
277 let _ = stats.set_roots(&mut batch, &self.roots);
278 }
279 for res in self.db.locked() {
280 let (path, locked) = res?;
281 let path = self.check_path(path)?;
282 self.locked.insert(path, locked);
283 }
284 if let Some(stats) = &mut self.stats {
285 let _ = stats.set_locked(&mut batch, &self.locked);
286 }
287 for res in self.db.iter() {
288 let (path, kind, _) = res?;
289 match kind {
290 DatumKind::Data | DatumKind::Formula => {
291 let path = self.check_path(path)?;
292 self.advertise_path(path);
293 }
294 DatumKind::Deleted | DatumKind::Invalid => (),
295 }
296 }
297 Ok(batch.commit(self.params.timeout.map(Duration::from_secs)).await)
298 }
299
300 fn process_writes(&mut self, txn: &mut Txn, mut writes: GPooled<Vec<WriteRequest>>) {
301 for req in writes.drain(..) {
303 let reply = req.send_result.map(Sendable::Write);
304 if let Some(p) = self.by_id.get(&req.id) {
305 txn.set_data(true, p.path.clone(), req.value, reply);
306 }
307 }
308 }
309
310 fn is_locked_gen(&self, path: &Path, parent_only: bool) -> bool {
311 let mut iter = self.locked.range::<str, (Bound<&str>, Bound<&str>)>((
312 Bound::Unbounded,
313 if parent_only {
314 Bound::Excluded(path.as_ref())
315 } else {
316 Bound::Included(path.as_ref())
317 },
318 ));
319 loop {
320 match iter.next_back() {
321 None => break false,
322 Some((p, locked)) if Path::is_parent(p, &path) => break *locked,
323 Some(_) => (),
324 }
325 }
326 }
327
328 fn is_locked(&self, path: &Path) -> bool {
329 self.is_locked_gen(path, false)
330 }
331
332 fn process_publish_request(&mut self, path: Path, reply: oneshot::Sender<()>) {
333 match self.check_path(path) {
334 Err(_) => {
335 let _: Result<_, _> = reply.send(());
337 }
338 Ok(path) => {
339 match self.db.lookup(path.as_ref()) {
340 Ok(Some(Datum::Data(v) | Datum::Formula(v, _))) => {
341 let _: Result<()> = self.publish_data(path, v);
342 }
343 Err(_) | Ok(Some(Datum::Deleted)) | Ok(None) => {
344 let locked = self.is_locked(&path);
345 let api = self
346 .api_path
347 .as_ref()
348 .map(|p| Path::is_parent(p, &path))
349 .unwrap_or(false);
350 if !locked && !api {
351 let _: Result<()> = self.publish_data(path, Value::Null);
352 };
353 }
354 }
355 let _: Result<_, _> = reply.send(());
356 }
357 }
358 }
359
360 fn process_publish_event(&mut self, e: PEvent) {
361 match e {
362 PEvent::Subscribe(_, _) | PEvent::Unsubscribe(_, _) => (),
363 PEvent::Destroyed(id) => {
364 if let Some(p) = self.by_id.remove(&id) {
365 self.by_path.remove(&p.path);
366 }
367 }
368 }
369 }
370
371 fn delete_path(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
372 let path = or_reply!(reply, self.check_path(path));
373 let bn = Path::basename(&path);
374 if bn == Some(".formula") || bn == Some(".on-write") {
375 if let Some(path) = Path::dirname(&path) {
376 txn.remove(Path::from(ArcStr::from(path)), reply);
377 }
378 } else {
379 txn.remove(path, reply);
380 }
381 }
382
383 fn delete_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
384 let path = or_reply!(reply, self.check_path(path));
385 txn.remove_subtree(path, reply);
386 }
387
388 fn lock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
389 let path = or_reply!(reply, self.check_path(path));
390 txn.set_locked(path, reply);
391 }
392
393 fn unlock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
394 let path = or_reply!(reply, self.check_path(path));
395 txn.set_unlocked(path, reply);
396 }
397
398 fn set_data(&mut self, txn: &mut Txn, path: Path, value: Value, reply: Reply) {
399 let path = or_reply!(reply, self.check_path(path));
400 txn.set_data(true, path, value, reply);
401 }
402
403 fn create_sheet(
404 &self,
405 txn: &mut Txn,
406 path: Path,
407 rows: usize,
408 columns: usize,
409 max_rows: usize,
410 max_columns: usize,
411 lock: bool,
412 reply: Reply,
413 ) {
414 let path = or_reply!(reply, self.check_path(path));
415 if rows > max_rows || columns > max_columns {
416 let m = literal!("rows <= max_rows && columns <= max_columns");
417 if let Some(reply) = reply {
418 reply.send(Value::error(m));
419 }
420 } else {
421 txn.create_sheet(path, rows, columns, max_rows, max_columns, lock, reply);
422 }
423 }
424
425 fn create_table(
426 &self,
427 txn: &mut Txn,
428 path: Path,
429 rows: Vec<ArcStr>,
430 columns: Vec<ArcStr>,
431 lock: bool,
432 reply: Reply,
433 ) {
434 let path = or_reply!(reply, self.check_path(path));
435 txn.create_table(path, rows, columns, lock, reply);
436 }
437
438 fn process_rpc_requests(&mut self, txn: &mut Txn, reqs: &mut Vec<RpcRequest>) {
439 let mut process_non_packed = |reply: Sendable, req: RpcRequestKind| match req {
440 RpcRequestKind::Delete(path) => self.delete_path(txn, path, Some(reply)),
441 RpcRequestKind::DeleteSubtree(path) => {
442 self.delete_subtree(txn, path, Some(reply))
443 }
444 RpcRequestKind::LockSubtree(path) => {
445 self.lock_subtree(txn, path, Some(reply))
446 }
447 RpcRequestKind::UnlockSubtree(path) => {
448 self.unlock_subtree(txn, path, Some(reply))
449 }
450 RpcRequestKind::SetData { path, value } => {
451 self.set_data(txn, path, value, Some(reply))
452 }
453 RpcRequestKind::CreateSheet {
454 path,
455 rows,
456 columns,
457 max_rows,
458 max_columns,
459 lock,
460 } => self.create_sheet(
461 txn,
462 path,
463 rows,
464 columns,
465 max_rows,
466 max_columns,
467 lock,
468 Some(reply),
469 ),
470 RpcRequestKind::AddSheetRows(path, rows) => {
471 txn.add_sheet_rows(path, rows, Some(reply));
472 }
473 RpcRequestKind::AddSheetCols(path, cols) => {
474 txn.add_sheet_columns(path, cols, Some(reply));
475 }
476 RpcRequestKind::DelSheetRows(path, rows) => {
477 txn.del_sheet_rows(path, rows, Some(reply));
478 }
479 RpcRequestKind::DelSheetCols(path, cols) => {
480 txn.del_sheet_columns(path, cols, Some(reply));
481 }
482 RpcRequestKind::CreateTable { path, rows, columns, lock } => {
483 self.create_table(txn, path, rows, columns, lock, Some(reply))
484 }
485 RpcRequestKind::AddTableRows(path, rows) => {
486 txn.add_table_rows(path, rows, Some(reply));
487 }
488 RpcRequestKind::AddTableCols(path, cols) => {
489 txn.add_table_columns(path, cols, Some(reply));
490 }
491 RpcRequestKind::DelTableRows(path, rows) => {
492 txn.del_table_rows(path, rows, Some(reply));
493 }
494 RpcRequestKind::DelTableCols(path, cols) => {
495 txn.del_table_columns(path, cols, Some(reply));
496 }
497 RpcRequestKind::AddRoot(path) => {
498 txn.add_root(path, Some(reply));
499 }
500 RpcRequestKind::DelRoot(path) => {
501 txn.del_root(path, Some(reply));
502 }
503 RpcRequestKind::Packed(_) => unreachable!(),
504 };
505 for mut req in reqs.drain(..) {
506 match req.kind {
507 RpcRequestKind::Packed(reqs) => {
508 let res = Arc::new(Mutex::new(Value::Null));
509 for req in reqs {
510 let reply = Sendable::Packed(res.clone());
511 process_non_packed(reply, req)
512 }
513 req.reply.send(mem::replace(&mut *res.lock(), Value::Null));
514 }
515 k => {
516 let reply = Sendable::Rpc(req.reply);
517 process_non_packed(reply, k)
518 }
519 }
520 }
521 }
522
523 fn remove_deleted_published(&mut self, path: &Path) {
524 self.remove_advertisement(&path);
525 if let Some(p) = self.by_path.remove(path) {
526 self.by_id.remove(&p.val.id());
527 }
528 }
529
530 fn remove_advertisement(&self, path: &Path) {
531 if let Some((_, def)) = self.get_root(&path) {
532 def.remove_advertisement(&path);
533 }
534 }
535
536 fn advertise_path(&self, path: Path) {
537 if !self.params.sparse {
538 if let Some((_, def)) = self.get_root(&path) {
539 let _: Result<_> = def.advertise(path);
540 }
541 }
542 }
543
544 fn process_update(&mut self, batch: &mut UpdateBatch, mut update: db::Update) {
545 use db::UpdateKind;
546 let mut locked = false;
547 let mut roots = false;
548 for path in update.added_roots.drain(..) {
549 roots = true;
550 match self.publisher.publish_default(path.clone()) {
551 Err(e) => error!("failed to publish_default {path} {e:?}"),
552 Ok(dh) => {
553 self.roots.insert(path, dh);
554 }
555 }
556 }
557 for path in update.removed_roots.drain(..) {
558 roots = true;
559 self.roots.remove(&path);
560 }
561 for (path, value) in update.data.drain(..) {
562 match value {
563 UpdateKind::Updated(v) => {
564 if let Some(p) = self.by_path.get(&path) {
565 p.val.update(batch, v)
566 }
567 }
568 UpdateKind::Inserted(v) => match self.by_path.get(&path) {
569 Some(p) => p.val.update(batch, v),
570 None => self.advertise_path(path),
571 },
572 UpdateKind::Deleted => self.remove_deleted_published(&path),
573 }
574 }
575 for path in update.locked.drain(..) {
576 locked = true;
577 if self.is_locked_gen(&path, true) {
578 self.locked.remove(&path);
579 } else {
580 self.locked.insert(path, true);
581 }
582 }
583 for path in update.unlocked.drain(..) {
584 locked = true;
585 if !self.is_locked_gen(&path, true) {
586 self.locked.remove(&path);
587 } else {
588 self.locked.insert(path, false);
589 }
590 }
591 if locked {
592 if let Some(stats) = &mut self.stats {
594 let _: Result<_> = stats.set_locked(batch, &self.locked);
595 }
596 }
597 if roots {
598 if let Some(stats) = &mut self.stats {
600 let _: Result<_> = stats.set_roots(batch, &self.roots);
601 }
602 }
603 }
604
605 fn process_command(&mut self, txn: &mut Txn, c: ToInner) {
606 match c {
607 ToInner::GetDb(s) => {
608 let _ = s.send(self.db.clone());
609 }
610 ToInner::GetPub(s) => {
611 let _ = s.send(self.publisher.clone());
612 }
613 ToInner::Commit(t, s) => {
614 *txn = t;
615 let _ = s.send(());
616 }
617 }
618 }
619
620 async fn run(mut self, mut cmd: mpsc::UnboundedReceiver<ToInner>) -> Result<()> {
621 let mut rpcbatch = Vec::new();
622 let mut batch = self.publisher.start_batch();
623 let mut txn = Txn::new();
624 async fn api_rx(api: &mut Option<RpcApi>) -> BatchItem<RpcRequest> {
625 match api {
626 Some(api) => api.rx.select_next_some().await,
627 None => future::pending().await,
628 }
629 }
630 loop {
631 select_biased! {
632 r = self.publish_events.select_next_some() => {
633 self.process_publish_event(r);
634 },
635 r = self.roots.select_next_some() => {
636 self.process_publish_request(r.0, r.1)
637 },
638 r = api_rx(&mut self.api).fuse() => match r {
639 BatchItem::InBatch(v) => rpcbatch.push(v),
640 BatchItem::EndBatch => self.process_rpc_requests(&mut txn, &mut rpcbatch)
641 },
642 u = self.db_updates.select_next_some() => {
643 self.process_update(&mut batch, u);
644 },
645 c = cmd.select_next_some() => {
646 self.process_command(&mut txn, c);
647 },
648 w = self.write_updates_rx.select_next_some() => {
649 self.process_writes(&mut txn, w);
650 }
651 complete => break,
652 }
653 if txn.dirty() {
654 self.db.commit(mem::replace(&mut txn, Txn::new()));
655 }
656 if batch.len() > 0 {
657 let timeout = self.params.timeout.map(Duration::from_secs);
658 let new_batch = self.publisher.start_batch();
659 mem::replace(&mut batch, new_batch).commit(timeout).await;
660 }
661 }
662 self.publisher.clone().shutdown().await;
663 self.db.flush_async().await?;
664 Ok(())
665 }
666}
667
668enum ToInner {
669 GetDb(oneshot::Sender<Db>),
670 GetPub(oneshot::Sender<Publisher>),
671 Commit(Txn, oneshot::Sender<()>),
672}
673
674#[derive(Clone)]
675pub struct Container(mpsc::UnboundedSender<ToInner>);
676
677impl Container {
678 pub async fn start(
684 cfg: Config,
685 auth: DesiredAuth,
686 params: Params,
687 ) -> Result<Container> {
688 let (w, r) = mpsc::unbounded();
689 let mut c = ContainerInner::new(cfg, auth, params).await?;
690 c.init().await?;
691 task::spawn(async move {
692 match c.run(r).await {
693 Err(e) => error!("container stopped with error {}", e),
694 Ok(()) => info!("container stopped gracefully"),
695 }
696 });
697 Ok(Container(w))
698 }
699
700 pub async fn db(&self) -> Result<Db> {
707 let (w, r) = oneshot::channel();
708 self.0.unbounded_send(ToInner::GetDb(w))?;
709 Ok(r.await?)
710 }
711
712 pub async fn commit(&self, txn: Txn) -> Result<()> {
715 let (w, r) = oneshot::channel();
716 self.0.unbounded_send(ToInner::Commit(txn, w))?;
717 Ok(r.await?)
718 }
719
720 pub fn commit_unbounded(&self, txn: Txn) -> Result<()> {
722 let (w, _r) = oneshot::channel();
723 Ok(self.0.unbounded_send(ToInner::Commit(txn, w))?)
724 }
725
726 pub async fn publisher(&self) -> Result<Publisher> {
728 let (w, r) = oneshot::channel();
729 self.0.unbounded_send(ToInner::GetPub(w))?;
730 Ok(r.await?)
731 }
732}