1#[macro_use]
2extern crate netidx_protocols;
3
4mod db;
5mod rpcs;
6mod stats;
7
8use crate::rpcs::RpcApi;
9use anyhow::{bail, Result};
10use arcstr::{literal, ArcStr};
11pub use db::{Datum, DatumKind, Db, Reply, Sendable, Txn};
12use derive_builder::Builder;
13use futures::{
14 self,
15 channel::{mpsc, oneshot},
16 future,
17 prelude::*,
18 select_biased,
19 stream::FusedStream,
20};
21use fxhash::FxHashMap;
22use log::{error, info};
23use netidx::{
24 config::Config,
25 path::Path,
26 publisher::{
27 BindCfg, DefaultHandle, Event as PEvent, Id, PublishFlags, Publisher,
28 PublisherBuilder, UpdateBatch, Val, Value, WriteRequest,
29 },
30 resolver_client::DesiredAuth,
31 utils::BatchItem,
32};
33use parking_lot::Mutex;
34use poolshark::global::GPooled;
35use rpcs::{RpcRequest, RpcRequestKind};
36use stats::Stats;
37use std::{
38 collections::{
39 BTreeMap,
40 Bound::{self, *},
41 },
42 mem,
43 ops::{Deref, DerefMut},
44 path::PathBuf,
45 pin::Pin,
46 time::Duration,
47};
48use structopt::StructOpt;
49use tokio::task;
50use triomphe::Arc;
51
52#[derive(StructOpt, Builder, Debug)]
53pub struct Params {
54 #[structopt(
55 short = "b",
56 long = "bind",
57 help = "configure the bind address e.g. local, 192.168.0.0/16, 127.0.0.1:5000"
58 )]
59 #[builder(setter(strip_option), default)]
60 pub bind: Option<BindCfg>,
61 #[structopt(
62 long = "timeout",
63 help = "require subscribers to consume values before timeout (seconds)"
64 )]
65 #[builder(setter(strip_option), default)]
66 pub timeout: Option<u64>,
67 #[structopt(long = "slack", help = "set the publisher slack (default 3 batches)")]
68 #[builder(setter(strip_option), default)]
69 pub slack: Option<usize>,
70 #[structopt(
71 long = "max_clients",
72 help = "set the maximum number of clients (default 768)"
73 )]
74 #[builder(setter(strip_option), default)]
75 pub max_clients: Option<usize>,
76 #[structopt(long = "api-path", help = "the netidx path of the container api")]
77 #[builder(setter(strip_option), default)]
78 pub api_path: Option<Path>,
79 #[structopt(long = "db", help = "the db file")]
80 #[builder(setter(strip_option), default)]
81 pub db: Option<String>,
82 #[structopt(long = "cache-size", help = "db page cache size in bytes")]
83 #[builder(setter(strip_option), default)]
84 pub cache_size: Option<u64>,
85 #[structopt(long = "sparse", help = "don't even advertise the contents of the db")]
86 #[builder(default = "false")]
87 pub sparse: bool,
88}
89
90impl Params {
91 pub fn default_db_path() -> Option<PathBuf> {
92 dirs::data_dir().map(|mut p| {
93 p.push("netidx");
94 p.push("container");
95 p.push("db");
96 p
97 })
98 }
99}
100
101macro_rules! or_reply {
102 ($reply:expr, $r:expr) => {
103 match $r {
104 Ok(r) => r,
105 Err(e) => {
106 if let Some(reply) = $reply {
107 let e = Value::error(format!("{}", e));
108 reply.send(e);
109 }
110 return;
111 }
112 }
113 };
114}
115
116struct Published {
117 path: Path,
118 val: Val,
119}
120
121#[must_use = "streams do nothing unless polled"]
122struct Roots(BTreeMap<Path, DefaultHandle>);
123
124impl Deref for Roots {
125 type Target = BTreeMap<Path, DefaultHandle>;
126
127 fn deref(&self) -> &Self::Target {
128 &self.0
129 }
130}
131
132impl DerefMut for Roots {
133 fn deref_mut(&mut self) -> &mut Self::Target {
134 &mut self.0
135 }
136}
137
138impl<'a> Stream for Roots {
139 type Item = (Path, oneshot::Sender<()>);
140
141 fn poll_next(
142 mut self: Pin<&mut Self>,
143 cx: &mut std::task::Context<'_>,
144 ) -> std::task::Poll<Option<Self::Item>> {
145 use std::task::Poll;
146 for dh in self.values_mut() {
147 match Pin::new(&mut *dh).poll_next(cx) {
148 Poll::Pending => (),
149 r @ Poll::Ready(_) => return r,
150 }
151 }
152 Poll::Pending
153 }
154}
155
156impl<'a> FusedStream for Roots {
157 fn is_terminated(&self) -> bool {
158 self.values().all(|ch| ch.is_terminated())
159 }
160}
161
162struct ContainerInner {
163 params: Params,
164 db: Db,
165 publisher: Publisher,
166 api_path: Option<Path>,
167 stats: Option<Stats>,
168 locked: BTreeMap<Path, bool>,
169 write_updates_tx: mpsc::Sender<GPooled<Vec<WriteRequest>>>,
170 write_updates_rx: mpsc::Receiver<GPooled<Vec<WriteRequest>>>,
171 by_id: FxHashMap<Id, Arc<Published>>,
172 by_path: FxHashMap<Path, Arc<Published>>,
173 publish_events: mpsc::UnboundedReceiver<PEvent>,
174 roots: Roots,
175 db_updates: mpsc::UnboundedReceiver<db::Update>,
176 api: Option<rpcs::RpcApi>,
177}
178
179impl ContainerInner {
180 async fn new(cfg: Config, auth: DesiredAuth, params: Params) -> Result<Self> {
181 let (publish_events_tx, publish_events) = mpsc::unbounded();
182 let mut publisher = PublisherBuilder::new(cfg.clone());
183 publisher.desired_auth(auth.clone()).bind_cfg(params.bind);
184 if let Some(n) = params.slack {
185 publisher.slack(n);
186 }
187 if let Some(n) = params.max_clients {
188 publisher.max_clients(n);
189 }
190 let publisher = publisher.build().await?;
191 publisher.events(publish_events_tx);
192 let (db, db_updates) =
193 Db::new(¶ms, publisher.clone(), params.api_path.clone())?;
194 let (write_updates_tx, write_updates_rx) = mpsc::channel(3);
195 let (api_path, api) = match params.api_path.as_ref() {
196 None => (None, None),
197 Some(api_path) => {
198 let api_path = api_path.append("rpcs");
199 let api = rpcs::RpcApi::new(&publisher, &api_path)?;
200 (Some(api_path), Some(api))
201 }
202 };
203 let stats = match params.api_path.as_ref() {
204 None => None,
205 Some(p) => Some(Stats::new(publisher.clone(), p.clone())),
206 };
207 Ok(Self {
208 params,
209 api_path,
210 stats,
211 locked: BTreeMap::new(),
212 roots: Roots(BTreeMap::new()),
213 db,
214 publisher,
215 write_updates_tx,
216 write_updates_rx,
217 by_id: FxHashMap::default(),
218 by_path: FxHashMap::default(),
219 publish_events,
220 db_updates,
221 api,
222 })
223 }
224
225 fn get_root(&self, path: &Path) -> Option<(&Path, &DefaultHandle)> {
226 match self
227 .roots
228 .range::<str, (Bound<&str>, Bound<&str>)>((
229 Unbounded,
230 Included(path.as_ref()),
231 ))
232 .next_back()
233 {
234 None => None,
235 Some((prev, def)) => {
236 if path.starts_with(prev.as_ref()) {
237 Some((prev, def))
238 } else {
239 None
240 }
241 }
242 }
243 }
244
245 fn check_path(&self, path: Path) -> Result<Path> {
246 if self.get_root(&path).is_some() {
247 Ok(path)
248 } else {
249 bail!("non root path")
250 }
251 }
252
253 fn publish_data(&mut self, path: Path, value: Value) -> Result<()> {
254 self.advertise_path(path.clone());
255 let val = self.publisher.publish_with_flags(
256 PublishFlags::DESTROY_ON_IDLE,
257 path.clone(),
258 value.clone(),
259 )?;
260 let id = val.id();
261 self.publisher.writes(val.id(), self.write_updates_tx.clone());
262 let published = Arc::new(Published { path: path.clone(), val });
263 self.by_id.insert(id, published.clone());
264 self.by_path.insert(path.clone(), published);
265 Ok(())
266 }
267
268 async fn init(&mut self) -> Result<()> {
269 let mut batch = self.publisher.start_batch();
270 for res in self.db.roots() {
271 let path = res?;
272 let def = self.publisher.publish_default(path.clone())?;
273 self.roots.insert(path, def);
274 }
275 if let Some(stats) = &mut self.stats {
276 let _ = stats.set_roots(&mut batch, &self.roots);
277 }
278 for res in self.db.locked() {
279 let (path, locked) = res?;
280 let path = self.check_path(path)?;
281 self.locked.insert(path, locked);
282 }
283 if let Some(stats) = &mut self.stats {
284 let _ = stats.set_locked(&mut batch, &self.locked);
285 }
286 for res in self.db.iter() {
287 let (path, kind, _) = res?;
288 match kind {
289 DatumKind::Data | DatumKind::Formula => {
290 let path = self.check_path(path)?;
291 self.advertise_path(path);
292 }
293 DatumKind::Deleted | DatumKind::Invalid => (),
294 }
295 }
296 Ok(batch.commit(self.params.timeout.map(Duration::from_secs)).await)
297 }
298
299 fn process_writes(&mut self, txn: &mut Txn, mut writes: GPooled<Vec<WriteRequest>>) {
300 for req in writes.drain(..) {
302 let reply = req.send_result.map(Sendable::Write);
303 if let Some(p) = self.by_id.get(&req.id) {
304 txn.set_data(true, p.path.clone(), req.value, reply);
305 }
306 }
307 }
308
309 fn is_locked_gen(&self, path: &Path, parent_only: bool) -> bool {
310 let mut iter = self.locked.range::<str, (Bound<&str>, Bound<&str>)>((
311 Bound::Unbounded,
312 if parent_only {
313 Bound::Excluded(path.as_ref())
314 } else {
315 Bound::Included(path.as_ref())
316 },
317 ));
318 loop {
319 match iter.next_back() {
320 None => break false,
321 Some((p, locked)) if Path::is_parent(p, &path) => break *locked,
322 Some(_) => (),
323 }
324 }
325 }
326
327 fn is_locked(&self, path: &Path) -> bool {
328 self.is_locked_gen(path, false)
329 }
330
331 fn process_publish_request(&mut self, path: Path, reply: oneshot::Sender<()>) {
332 match self.check_path(path) {
333 Err(_) => {
334 let _: Result<_, _> = reply.send(());
336 }
337 Ok(path) => {
338 match self.db.lookup(path.as_ref()) {
339 Ok(Some(Datum::Data(v) | Datum::Formula(v, _))) => {
340 let _: Result<()> = self.publish_data(path, v);
341 }
342 Err(_) | Ok(Some(Datum::Deleted)) | Ok(None) => {
343 let locked = self.is_locked(&path);
344 let api = self
345 .api_path
346 .as_ref()
347 .map(|p| Path::is_parent(p, &path))
348 .unwrap_or(false);
349 if !locked && !api {
350 let _: Result<()> = self.publish_data(path, Value::Null);
351 };
352 }
353 }
354 let _: Result<_, _> = reply.send(());
355 }
356 }
357 }
358
359 fn process_publish_event(&mut self, e: PEvent) {
360 match e {
361 PEvent::Subscribe(_, _) | PEvent::Unsubscribe(_, _) => (),
362 PEvent::Destroyed(id) => {
363 if let Some(p) = self.by_id.remove(&id) {
364 self.by_path.remove(&p.path);
365 }
366 }
367 }
368 }
369
370 fn delete_path(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
371 let path = or_reply!(reply, self.check_path(path));
372 let bn = Path::basename(&path);
373 if bn == Some(".formula") || bn == Some(".on-write") {
374 if let Some(path) = Path::dirname(&path) {
375 txn.remove(Path::from(ArcStr::from(path)), reply);
376 }
377 } else {
378 txn.remove(path, reply);
379 }
380 }
381
382 fn delete_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
383 let path = or_reply!(reply, self.check_path(path));
384 txn.remove_subtree(path, reply);
385 }
386
387 fn lock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
388 let path = or_reply!(reply, self.check_path(path));
389 txn.set_locked(path, reply);
390 }
391
392 fn unlock_subtree(&mut self, txn: &mut Txn, path: Path, reply: Reply) {
393 let path = or_reply!(reply, self.check_path(path));
394 txn.set_unlocked(path, reply);
395 }
396
397 fn set_data(&mut self, txn: &mut Txn, path: Path, value: Value, reply: Reply) {
398 let path = or_reply!(reply, self.check_path(path));
399 txn.set_data(true, path, value, reply);
400 }
401
402 fn create_sheet(
403 &self,
404 txn: &mut Txn,
405 path: Path,
406 rows: usize,
407 columns: usize,
408 max_rows: usize,
409 max_columns: usize,
410 lock: bool,
411 reply: Reply,
412 ) {
413 let path = or_reply!(reply, self.check_path(path));
414 if rows > max_rows || columns > max_columns {
415 let m = literal!("rows <= max_rows && columns <= max_columns");
416 if let Some(reply) = reply {
417 reply.send(Value::error(m));
418 }
419 } else {
420 txn.create_sheet(path, rows, columns, max_rows, max_columns, lock, reply);
421 }
422 }
423
424 fn create_table(
425 &self,
426 txn: &mut Txn,
427 path: Path,
428 rows: Vec<ArcStr>,
429 columns: Vec<ArcStr>,
430 lock: bool,
431 reply: Reply,
432 ) {
433 let path = or_reply!(reply, self.check_path(path));
434 txn.create_table(path, rows, columns, lock, reply);
435 }
436
437 fn process_rpc_requests(&mut self, txn: &mut Txn, reqs: &mut Vec<RpcRequest>) {
438 let mut process_non_packed = |reply: Sendable, req: RpcRequestKind| match req {
439 RpcRequestKind::Delete(path) => self.delete_path(txn, path, Some(reply)),
440 RpcRequestKind::DeleteSubtree(path) => {
441 self.delete_subtree(txn, path, Some(reply))
442 }
443 RpcRequestKind::LockSubtree(path) => {
444 self.lock_subtree(txn, path, Some(reply))
445 }
446 RpcRequestKind::UnlockSubtree(path) => {
447 self.unlock_subtree(txn, path, Some(reply))
448 }
449 RpcRequestKind::SetData { path, value } => {
450 self.set_data(txn, path, value, Some(reply))
451 }
452 RpcRequestKind::CreateSheet {
453 path,
454 rows,
455 columns,
456 max_rows,
457 max_columns,
458 lock,
459 } => self.create_sheet(
460 txn,
461 path,
462 rows,
463 columns,
464 max_rows,
465 max_columns,
466 lock,
467 Some(reply),
468 ),
469 RpcRequestKind::AddSheetRows(path, rows) => {
470 txn.add_sheet_rows(path, rows, Some(reply));
471 }
472 RpcRequestKind::AddSheetCols(path, cols) => {
473 txn.add_sheet_columns(path, cols, Some(reply));
474 }
475 RpcRequestKind::DelSheetRows(path, rows) => {
476 txn.del_sheet_rows(path, rows, Some(reply));
477 }
478 RpcRequestKind::DelSheetCols(path, cols) => {
479 txn.del_sheet_columns(path, cols, Some(reply));
480 }
481 RpcRequestKind::CreateTable { path, rows, columns, lock } => {
482 self.create_table(txn, path, rows, columns, lock, Some(reply))
483 }
484 RpcRequestKind::AddTableRows(path, rows) => {
485 txn.add_table_rows(path, rows, Some(reply));
486 }
487 RpcRequestKind::AddTableCols(path, cols) => {
488 txn.add_table_columns(path, cols, Some(reply));
489 }
490 RpcRequestKind::DelTableRows(path, rows) => {
491 txn.del_table_rows(path, rows, Some(reply));
492 }
493 RpcRequestKind::DelTableCols(path, cols) => {
494 txn.del_table_columns(path, cols, Some(reply));
495 }
496 RpcRequestKind::AddRoot(path) => {
497 txn.add_root(path, Some(reply));
498 }
499 RpcRequestKind::DelRoot(path) => {
500 txn.del_root(path, Some(reply));
501 }
502 RpcRequestKind::Packed(_) => unreachable!(),
503 };
504 for mut req in reqs.drain(..) {
505 match req.kind {
506 RpcRequestKind::Packed(reqs) => {
507 let res = Arc::new(Mutex::new(Value::Null));
508 for req in reqs {
509 let reply = Sendable::Packed(res.clone());
510 process_non_packed(reply, req)
511 }
512 req.reply.send(mem::replace(&mut *res.lock(), Value::Null));
513 }
514 k => {
515 let reply = Sendable::Rpc(req.reply);
516 process_non_packed(reply, k)
517 }
518 }
519 }
520 }
521
522 fn remove_deleted_published(&mut self, path: &Path) {
523 self.remove_advertisement(&path);
524 if let Some(p) = self.by_path.remove(path) {
525 self.by_id.remove(&p.val.id());
526 }
527 }
528
529 fn remove_advertisement(&self, path: &Path) {
530 if let Some((_, def)) = self.get_root(&path) {
531 def.remove_advertisement(&path);
532 }
533 }
534
535 fn advertise_path(&self, path: Path) {
536 if !self.params.sparse {
537 if let Some((_, def)) = self.get_root(&path) {
538 let _: Result<_> = def.advertise(path);
539 }
540 }
541 }
542
543 fn process_update(&mut self, batch: &mut UpdateBatch, mut update: db::Update) {
544 use db::UpdateKind;
545 let mut locked = false;
546 let mut roots = false;
547 for path in update.added_roots.drain(..) {
548 roots = true;
549 match self.publisher.publish_default(path.clone()) {
550 Err(e) => error!("failed to publish_default {path} {e:?}"),
551 Ok(dh) => {
552 self.roots.insert(path, dh);
553 }
554 }
555 }
556 for path in update.removed_roots.drain(..) {
557 roots = true;
558 self.roots.remove(&path);
559 }
560 for (path, value) in update.data.drain(..) {
561 match value {
562 UpdateKind::Updated(v) => {
563 if let Some(p) = self.by_path.get(&path) {
564 p.val.update(batch, v)
565 }
566 }
567 UpdateKind::Inserted(v) => match self.by_path.get(&path) {
568 Some(p) => p.val.update(batch, v),
569 None => self.advertise_path(path),
570 },
571 UpdateKind::Deleted => self.remove_deleted_published(&path),
572 }
573 }
574 for path in update.locked.drain(..) {
575 locked = true;
576 if self.is_locked_gen(&path, true) {
577 self.locked.remove(&path);
578 } else {
579 self.locked.insert(path, true);
580 }
581 }
582 for path in update.unlocked.drain(..) {
583 locked = true;
584 if !self.is_locked_gen(&path, true) {
585 self.locked.remove(&path);
586 } else {
587 self.locked.insert(path, false);
588 }
589 }
590 if locked {
591 if let Some(stats) = &mut self.stats {
593 let _: Result<_> = stats.set_locked(batch, &self.locked);
594 }
595 }
596 if roots {
597 if let Some(stats) = &mut self.stats {
599 let _: Result<_> = stats.set_roots(batch, &self.roots);
600 }
601 }
602 }
603
604 fn process_command(&mut self, txn: &mut Txn, c: ToInner) {
605 match c {
606 ToInner::GetDb(s) => {
607 let _ = s.send(self.db.clone());
608 }
609 ToInner::GetPub(s) => {
610 let _ = s.send(self.publisher.clone());
611 }
612 ToInner::Commit(t, s) => {
613 *txn = t;
614 let _ = s.send(());
615 }
616 }
617 }
618
619 async fn run(mut self, mut cmd: mpsc::UnboundedReceiver<ToInner>) -> Result<()> {
620 let mut rpcbatch = Vec::new();
621 let mut batch = self.publisher.start_batch();
622 let mut txn = Txn::new();
623 async fn api_rx(api: &mut Option<RpcApi>) -> BatchItem<RpcRequest> {
624 match api {
625 Some(api) => api.rx.select_next_some().await,
626 None => future::pending().await,
627 }
628 }
629 loop {
630 select_biased! {
631 r = self.publish_events.select_next_some() => {
632 self.process_publish_event(r);
633 },
634 r = self.roots.select_next_some() => {
635 self.process_publish_request(r.0, r.1)
636 },
637 r = api_rx(&mut self.api).fuse() => match r {
638 BatchItem::InBatch(v) => rpcbatch.push(v),
639 BatchItem::EndBatch => self.process_rpc_requests(&mut txn, &mut rpcbatch)
640 },
641 u = self.db_updates.select_next_some() => {
642 self.process_update(&mut batch, u);
643 },
644 c = cmd.select_next_some() => {
645 self.process_command(&mut txn, c);
646 },
647 w = self.write_updates_rx.select_next_some() => {
648 self.process_writes(&mut txn, w);
649 }
650 complete => break,
651 }
652 if txn.dirty() {
653 self.db.commit(mem::replace(&mut txn, Txn::new()));
654 }
655 if batch.len() > 0 {
656 let timeout = self.params.timeout.map(Duration::from_secs);
657 let new_batch = self.publisher.start_batch();
658 mem::replace(&mut batch, new_batch).commit(timeout).await;
659 }
660 }
661 self.publisher.clone().shutdown().await;
662 self.db.flush_async().await?;
663 Ok(())
664 }
665}
666
667enum ToInner {
668 GetDb(oneshot::Sender<Db>),
669 GetPub(oneshot::Sender<Publisher>),
670 Commit(Txn, oneshot::Sender<()>),
671}
672
673#[derive(Clone)]
674pub struct Container(mpsc::UnboundedSender<ToInner>);
675
676impl Container {
677 pub async fn start(
683 cfg: Config,
684 auth: DesiredAuth,
685 params: Params,
686 ) -> Result<Container> {
687 let (w, r) = mpsc::unbounded();
688 let mut c = ContainerInner::new(cfg, auth, params).await?;
689 c.init().await?;
690 task::spawn(async move {
691 match c.run(r).await {
692 Err(e) => error!("container stopped with error {}", e),
693 Ok(()) => info!("container stopped gracefully"),
694 }
695 });
696 Ok(Container(w))
697 }
698
699 pub async fn db(&self) -> Result<Db> {
706 let (w, r) = oneshot::channel();
707 self.0.unbounded_send(ToInner::GetDb(w))?;
708 Ok(r.await?)
709 }
710
711 pub async fn commit(&self, txn: Txn) -> Result<()> {
714 let (w, r) = oneshot::channel();
715 self.0.unbounded_send(ToInner::Commit(txn, w))?;
716 Ok(r.await?)
717 }
718
719 pub fn commit_unbounded(&self, txn: Txn) -> Result<()> {
721 let (w, _r) = oneshot::channel();
722 Ok(self.0.unbounded_send(ToInner::Commit(txn, w))?)
723 }
724
725 pub async fn publisher(&self) -> Result<Publisher> {
727 let (w, r) = oneshot::channel();
728 self.0.unbounded_send(ToInner::GetPub(w))?;
729 Ok(r.await?)
730 }
731}