1use super::Params;
2use anyhow::{anyhow, bail, Result};
3use arcstr::ArcStr;
4use bytes::{Buf, BufMut};
5use futures::{
6 channel::{
7 mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
8 oneshot,
9 },
10 prelude::*,
11 select_biased,
12};
13use netidx::{
14 pack::{Pack, PackError},
15 path::Path,
16 publisher::{Publisher, SendResult, UpdateBatch, Val},
17 subscriber::Value,
18 utils::{BatchItem, Batched},
19};
20use netidx_protocols::rpc::server::RpcReply;
21use parking_lot::Mutex;
22use poolshark::global::{GPooled, Pool};
23use sled;
24use std::{
25 cmp::{max, min},
26 collections::{HashMap, HashSet},
27 fmt::Write,
28 path::PathBuf,
29 str,
30 sync::{
31 atomic::{AtomicUsize, Ordering},
32 LazyLock,
33 },
34 time::Duration,
35};
36use tokio::task;
37use triomphe::Arc;
38
39pub enum Sendable {
40 Packed(Arc<Mutex<Value>>),
41 Rpc(RpcReply),
42 Write(SendResult),
43}
44
45impl Sendable {
46 pub fn send(self, v: Value) {
47 match self {
48 Sendable::Packed(res) => {
49 let mut res = res.lock();
50 match &*res {
51 Value::Error(_) => (),
52 _ => {
53 *res = v;
54 }
55 }
56 }
57 Sendable::Rpc(mut reply) => {
58 reply.send(v);
59 }
60 Sendable::Write(reply) => {
61 reply.send(v);
62 }
63 }
64 }
65}
66
67pub type Reply = Option<Sendable>;
68type Txns = Vec<(TxnOp, Reply)>;
69
70static BUF: LazyLock<Pool<Vec<u8>>> = LazyLock::new(|| Pool::new(8, 16384));
71static PDPAIR: LazyLock<Pool<Vec<(Path, UpdateKind)>>> =
72 LazyLock::new(|| Pool::new(256, 8124));
73static PATHS: LazyLock<Pool<Vec<Path>>> = LazyLock::new(|| Pool::new(256, 65534));
74static TXNS: LazyLock<Pool<Txns>> = LazyLock::new(|| Pool::new(16, 65534));
75static STXNS: LazyLock<Pool<Txns>> = LazyLock::new(|| Pool::new(65534, 32));
76static BYPATH: LazyLock<Pool<HashMap<Path, GPooled<Txns>>>> =
77 LazyLock::new(|| Pool::new(16, 65534));
78
79pub(super) enum UpdateKind {
80 Deleted,
81 Inserted(Value),
82 Updated(Value),
83}
84
85pub(super) struct Update {
86 pub(super) data: GPooled<Vec<(Path, UpdateKind)>>,
87 pub(super) locked: GPooled<Vec<Path>>,
88 pub(super) unlocked: GPooled<Vec<Path>>,
89 pub(super) added_roots: GPooled<Vec<Path>>,
90 pub(super) removed_roots: GPooled<Vec<Path>>,
91}
92
93impl Update {
94 fn new() -> Update {
95 Update {
96 data: PDPAIR.take(),
97 locked: PATHS.take(),
98 unlocked: PATHS.take(),
99 added_roots: PATHS.take(),
100 removed_roots: PATHS.take(),
101 }
102 }
103
104 fn merge_from(&mut self, mut other: Update) {
105 self.data.extend(other.data.drain(..));
106 self.locked.extend(other.locked.drain(..));
107 self.unlocked.extend(other.unlocked.drain(..));
108 self.added_roots.extend(other.added_roots.drain(..));
109 self.removed_roots.extend(other.removed_roots.drain(..));
110 }
111
112 fn merge(mut self, other: Update) -> Update {
113 self.merge_from(other);
114 self
115 }
116}
117
118pub enum DatumKind {
119 Data,
120 Formula,
121 Deleted,
122 Invalid,
123}
124
125impl DatumKind {
126 fn decode(buf: &mut impl Buf) -> DatumKind {
127 match buf.get_u8() {
128 0 => DatumKind::Data,
129 1 => DatumKind::Formula,
130 2 => DatumKind::Deleted,
131 _ => DatumKind::Invalid,
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
137pub enum Datum {
138 Data(Value),
139 Formula(Value, Value),
140 Deleted,
141}
142
143impl Pack for Datum {
144 fn encoded_len(&self) -> usize {
145 1 + match self {
146 Datum::Data(v) => v.encoded_len(),
147 Datum::Formula(f, w) => f.encoded_len() + w.encoded_len(),
148 Datum::Deleted => 0,
149 }
150 }
151
152 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
153 match self {
154 Datum::Data(v) => {
155 buf.put_u8(0);
156 Pack::encode(v, buf)
157 }
158 Datum::Formula(f, w) => {
159 buf.put_u8(1);
160 Pack::encode(f, buf)?;
161 Pack::encode(w, buf)
162 }
163 Datum::Deleted => Ok(buf.put_u8(2)),
164 }
165 }
166
167 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
168 if buf.remaining() == 0 {
169 Err(PackError::InvalidFormat)
170 } else {
171 match buf.get_u8() {
172 0 => Ok(Datum::Data(Value::decode(buf)?)),
173 1 => {
174 let f = Value::decode(buf)?;
175 let w = Value::decode(buf)?;
176 Ok(Datum::Formula(f, w))
177 }
178 2 => Ok(Datum::Deleted),
179 _ => Err(PackError::UnknownTag),
180 }
181 }
182 }
183}
184
185fn lookup_value<P: AsRef<[u8]>>(tree: &sled::Tree, path: P) -> Result<Option<Datum>> {
186 match tree.get(path.as_ref())? {
187 None => Ok(None),
188 Some(v) => Ok(Some(Datum::decode(&mut &*v)?)),
189 }
190}
191
192fn iter_paths(tree: &sled::Tree) -> impl Iterator<Item = Result<Path>> + 'static {
193 tree.iter().keys().map(|res| Ok(Path::from(ArcStr::from(str::from_utf8(&res?)?))))
194}
195
196enum TxnOp {
197 Remove(Path),
198 SetData(bool, Path, Value),
199 CreateSheet {
200 base: Path,
201 rows: usize,
202 max_rows: usize,
203 max_columns: usize,
204 cols: usize,
205 lock: bool,
206 },
207 AddSheetColumns {
208 base: Path,
209 cols: usize,
210 },
211 AddSheetRows {
212 base: Path,
213 rows: usize,
214 },
215 DelSheetColumns {
216 base: Path,
217 cols: usize,
218 },
219 DelSheetRows {
220 base: Path,
221 rows: usize,
222 },
223 CreateTable {
224 base: Path,
225 rows: Vec<ArcStr>,
226 cols: Vec<ArcStr>,
227 lock: bool,
228 },
229 AddTableColumns {
230 base: Path,
231 cols: Vec<ArcStr>,
232 },
233 AddTableRows {
234 base: Path,
235 rows: Vec<ArcStr>,
236 },
237 DelTableColumns {
238 base: Path,
239 cols: Vec<ArcStr>,
240 },
241 DelTableRows {
242 base: Path,
243 rows: Vec<ArcStr>,
244 },
245 SetLocked(Path),
246 SetUnlocked(Path),
247 AddRoot(Path),
248 DelRoot(Path),
249 RemoveSubtree(Path),
250 Flush(oneshot::Sender<()>),
251}
252
253impl TxnOp {
254 fn path(&self) -> Path {
255 use TxnOp::*;
256 match self {
257 Remove(p) => p.clone(),
258 SetData(_, p, _) => p.clone(),
259 CreateSheet { base, .. } => base.clone(),
260 AddSheetColumns { base, .. } => base.clone(),
261 AddSheetRows { base, .. } => base.clone(),
262 DelSheetColumns { base, .. } => base.clone(),
263 DelSheetRows { base, .. } => base.clone(),
264 CreateTable { base, .. } => base.clone(),
265 AddTableColumns { base, .. } => base.clone(),
266 AddTableRows { base, .. } => base.clone(),
267 DelTableColumns { base, .. } => base.clone(),
268 DelTableRows { base, .. } => base.clone(),
269 SetLocked(p) => p.clone(),
270 SetUnlocked(p) => p.clone(),
271 RemoveSubtree(p) => p.clone(),
272 AddRoot(p) => p.clone(),
273 DelRoot(p) => p.clone(),
274 Flush(_) => Path::root(),
275 }
276 }
277}
278
279pub struct Txn(GPooled<Txns>);
280
281impl Txn {
282 pub fn new() -> Self {
283 Self(TXNS.take())
284 }
285
286 pub fn dirty(&self) -> bool {
287 self.0.len() > 0
288 }
289
290 pub fn remove(&mut self, path: Path, reply: Reply) {
291 self.0.push((TxnOp::Remove(path), reply))
292 }
293
294 pub fn set_data(&mut self, update: bool, path: Path, value: Value, reply: Reply) {
295 self.0.push((TxnOp::SetData(update, path, value), reply))
296 }
297
298 pub fn create_sheet(
299 &mut self,
300 base: Path,
301 rows: usize,
302 cols: usize,
303 max_rows: usize,
304 max_columns: usize,
305 lock: bool,
306 reply: Reply,
307 ) {
308 self.0.push((
309 TxnOp::CreateSheet { base, rows, cols, max_rows, max_columns, lock },
310 reply,
311 ))
312 }
313
314 pub fn add_sheet_columns(&mut self, base: Path, cols: usize, reply: Reply) {
315 self.0.push((TxnOp::AddSheetColumns { base, cols }, reply))
316 }
317
318 pub fn add_sheet_rows(&mut self, base: Path, rows: usize, reply: Reply) {
319 self.0.push((TxnOp::AddSheetRows { base, rows }, reply))
320 }
321
322 pub fn del_sheet_columns(&mut self, base: Path, cols: usize, reply: Reply) {
323 self.0.push((TxnOp::DelSheetColumns { base, cols }, reply))
324 }
325
326 pub fn del_sheet_rows(&mut self, base: Path, rows: usize, reply: Reply) {
327 self.0.push((TxnOp::DelSheetRows { base, rows }, reply))
328 }
329
330 pub fn create_table(
331 &mut self,
332 base: Path,
333 rows: Vec<ArcStr>,
334 cols: Vec<ArcStr>,
335 lock: bool,
336 reply: Reply,
337 ) {
338 self.0.push((TxnOp::CreateTable { base, rows, cols, lock }, reply))
339 }
340
341 pub fn add_table_columns(&mut self, base: Path, cols: Vec<ArcStr>, reply: Reply) {
342 self.0.push((TxnOp::AddTableColumns { base, cols }, reply))
343 }
344
345 pub fn add_table_rows(&mut self, base: Path, rows: Vec<ArcStr>, reply: Reply) {
346 self.0.push((TxnOp::AddTableRows { base, rows }, reply))
347 }
348
349 pub fn del_table_columns(&mut self, base: Path, cols: Vec<ArcStr>, reply: Reply) {
350 self.0.push((TxnOp::DelTableColumns { base, cols }, reply))
351 }
352
353 pub fn del_table_rows(&mut self, base: Path, rows: Vec<ArcStr>, reply: Reply) {
354 self.0.push((TxnOp::DelTableRows { base, rows }, reply))
355 }
356
357 pub fn set_locked(&mut self, path: Path, reply: Reply) {
358 self.0.push((TxnOp::SetLocked(path), reply))
359 }
360
361 pub fn set_unlocked(&mut self, path: Path, reply: Reply) {
362 self.0.push((TxnOp::SetUnlocked(path), reply))
363 }
364
365 pub fn add_root(&mut self, path: Path, reply: Reply) {
366 self.0.push((TxnOp::AddRoot(path), reply));
367 }
368
369 pub fn del_root(&mut self, path: Path, reply: Reply) {
370 self.0.push((TxnOp::DelRoot(path), reply));
371 }
372
373 pub fn remove_subtree(&mut self, path: Path, reply: Reply) {
374 self.0.push((TxnOp::RemoveSubtree(path), reply))
375 }
376}
377
378fn remove(data: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
379 let key = path.as_bytes();
380 let mut val = BUF.take();
381 Datum::Deleted.encode(&mut *val)?;
382 if let Some(data) = data.insert(key, &**val)? {
383 match DatumKind::decode(&mut &*data) {
384 DatumKind::Data | DatumKind::Formula => {
385 pending.data.push((path, UpdateKind::Deleted))
386 }
387 DatumKind::Deleted | DatumKind::Invalid => (),
388 }
389 }
390 Ok(())
391}
392
393fn set_data(
394 data: &sled::Tree,
395 pending: &mut Update,
396 update: bool,
397 path: Path,
398 value: Value,
399) -> Result<()> {
400 let key = path.as_bytes();
401 let mut val = BUF.take();
402 let datum = Datum::Data(value.clone());
403 datum.encode(&mut *val)?;
404 let up = match data.insert(key, &**val)? {
405 None => UpdateKind::Inserted(value),
406 Some(data) => match DatumKind::decode(&mut &*data) {
407 DatumKind::Data => UpdateKind::Updated(value),
408 DatumKind::Formula | DatumKind::Deleted | DatumKind::Invalid => {
409 UpdateKind::Inserted(value)
410 }
411 },
412 };
413 if update {
414 pending.data.push((path, up));
415 }
416 Ok(())
417}
418
419fn merge_err(e0: Result<()>, e1: Result<()>) -> Result<()> {
420 match (e0, e1) {
421 (Ok(()), Ok(())) => Ok(()),
422 (Err(e), Err(_)) => Err(e),
423 (Err(e), Ok(())) => Err(e),
424 (Ok(()), Err(e)) => Err(e),
425 }
426}
427
428fn create_sheet(
429 data: &sled::Tree,
430 locked: &sled::Tree,
431 pending: &mut Update,
432 base: Path,
433 rows: usize,
434 cols: usize,
435 max_rows: usize,
436 max_columns: usize,
437 lock: bool,
438) -> Result<()> {
439 use rayon::prelude::*;
440 let rd = 1 + (max(rows, max_rows) as f32).log10() as usize;
441 let cd = 1 + (max(cols, max_columns) as f32).log10() as usize;
442 let (up, res) = (0..rows)
443 .into_par_iter()
444 .map(|row| (0..cols).into_par_iter().map(move |col| (row, col)))
445 .flatten()
446 .fold(
447 || (Update::new(), String::new(), Ok(())),
448 |(mut pending, mut buf, res), (i, j)| {
449 buf.clear();
450 write!(buf, "{:0rwidth$}/{:0cwidth$}", i, j, rwidth = rd, cwidth = cd)
451 .unwrap();
452 let path = base.append(buf.as_str());
453 let res = match data.contains_key(path.as_bytes()) {
454 Ok(false) => {
455 let r = set_data(data, &mut pending, true, path, Value::Null);
456 merge_err(r, res)
457 }
458 Ok(true) => res,
459 Err(e) => Err(e.into()),
460 };
461 (pending, buf, res)
462 },
463 )
464 .map(|(u, _, r)| (u, r))
465 .reduce(
466 || (Update::new(), Ok(())),
467 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
468 );
469 pending.merge_from(up);
470 if lock {
471 set_locked(locked, pending, base)?
472 }
473 res
474}
475
476struct SheetDescr {
477 rows: GPooled<Vec<Path>>,
478 max_col: usize,
479 max_col_width: usize,
480}
481
482impl SheetDescr {
483 fn new(data: &sled::Tree, base: &Path) -> Result<Self> {
484 let base_levels = Path::levels(base);
485 let mut rows = PATHS.take();
486 let mut max_col = 0;
487 let mut max_col_width = 0;
488 for r in data.scan_prefix(base.as_bytes()).keys() {
489 if let Ok(k) = r {
490 if let Ok(path) = str::from_utf8(&*k) {
491 if Path::is_parent(base, path) {
492 let mut row = path;
493 let mut level = Path::levels(row);
494 loop {
495 if level == base_levels + 1 {
496 break;
497 } else if level == base_levels + 2 {
498 if let Some(col) = Path::basename(row) {
499 if let Ok(c) = col.parse::<usize>() {
500 max_col_width = max(max_col_width, col.len());
501 max_col = max(c, max_col);
502 }
503 }
504 }
505 row = Path::dirname(row).unwrap_or("/");
506 level -= 1;
507 }
508 match rows.last() {
509 None => rows.push(Path::from(ArcStr::from(row))),
510 Some(last) => {
511 if last.as_ref() != row {
512 rows.push(Path::from(ArcStr::from(row)));
513 }
514 }
515 }
516 }
517 }
518 }
519 }
520 Ok(Self { rows, max_col, max_col_width })
521 }
522}
523
524fn add_sheet_columns(
525 data: &sled::Tree,
526 pending: &mut Update,
527 base: Path,
528 cols: usize,
529) -> Result<()> {
530 use rayon::prelude::*;
531 let mut cs = SheetDescr::new(data, &base)?;
532 let max_col = cs.max_col;
533 let max_col_width = cs.max_col_width;
534 if cs.max_col_width < 1 + ((cs.max_col + cols + 1) as f32).log10() as usize {
535 bail!("columns full")
536 }
537 let (up, res) = cs
538 .rows
539 .par_drain(..)
540 .fold(
541 || (Update::new(), String::new(), Ok(())),
542 |(mut pending, mut buf, mut res), row| {
543 for i in 1..cols + 1 {
544 let col = max_col + i;
545 buf.clear();
546 write!(buf, "{}/{:0cwidth$}", row, col, cwidth = max_col_width)
547 .unwrap();
548 let path = Path::from(ArcStr::from(buf.as_str()));
549 res = match data.contains_key(path.as_bytes()) {
550 Ok(false) => {
551 let r = set_data(data, &mut pending, true, path, Value::Null);
552 merge_err(r, res)
553 }
554 Ok(true) => res,
555 Err(e) => Err(e.into()),
556 }
557 }
558 (pending, buf, res)
559 },
560 )
561 .map(|(u, _, r)| (u, r))
562 .reduce(
563 || (Update::new(), Ok(())),
564 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
565 );
566 pending.merge_from(up);
567 res
568}
569
570fn del_sheet_columns(
571 data: &sled::Tree,
572 pending: &mut Update,
573 base: Path,
574 cols: usize,
575) -> Result<()> {
576 use rayon::prelude::*;
577 let mut cs = SheetDescr::new(data, &base)?;
578 let max_col = cs.max_col;
579 let max_col_width = cs.max_col_width;
580 let cols = min(cs.max_col, cols);
581 let (up, res) = cs
582 .rows
583 .par_drain(..)
584 .fold(
585 || (Update::new(), String::new(), Ok(())),
586 |(mut pending, mut buf, mut res), row| {
587 for col in (max_col - cols..max_col + 1).rev() {
588 buf.clear();
589 write!(buf, "{}/{:0cw$}", row, col, cw = max_col_width).unwrap();
590 let path = Path::from(ArcStr::from(buf.as_str()));
591 res = merge_err(remove(data, &mut pending, path), res);
592 }
593 (pending, buf, res)
594 },
595 )
596 .map(|(u, _, r)| (u, r))
597 .reduce(
598 || (Update::new(), Ok(())),
599 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
600 );
601 pending.merge_from(up);
602 res
603}
604
605fn add_sheet_rows(
606 data: &sled::Tree,
607 pending: &mut Update,
608 base: Path,
609 rows: usize,
610) -> Result<()> {
611 use rayon::prelude::*;
612 let base_levels = Path::levels(&base);
613 let mut max_row = 0;
614 let mut max_col = 0;
615 let mut max_row_width = 0;
616 let mut max_col_width = 0;
617 let mut cols = HashSet::new();
618 for r in data.scan_prefix(base.as_bytes()).keys() {
619 let k = r?;
620 let path = str::from_utf8(&k)?;
621 if Path::is_parent(&base, path) {
622 if Path::levels(path) == base_levels + 2 {
623 if let Some(row) = Path::dirname(path) {
624 if let Some(row) = Path::basename(row) {
625 if let Ok(i) = row.parse::<usize>() {
626 max_row = max(i, max_row);
627 max_row_width = max(row.len(), max_row_width);
628 }
629 }
630 }
631 let col = Path::basename(path)
632 .and_then(|p| p.parse::<usize>().ok().map(move |i| (p, i)))
633 .map(|(col, i)| {
634 max_col = max(i, max_col);
635 max_col_width = max(col.len(), max_col_width);
636 i
637 });
638 if let Some(col) = col {
639 cols.insert(col);
640 }
641 }
642 }
643 }
644 if max_row_width < 1 + ((max_row + rows + 1) as f32).log10() as usize {
645 bail!("sheet is full")
646 }
647 let (up, res) = (max_row + 1..max_row + rows)
648 .into_par_iter()
649 .fold(
650 || (Update::new(), String::new(), Ok(())),
651 |(mut pending, mut buf, mut res), row| {
652 for col in &cols {
653 buf.clear();
654 write!(
655 buf,
656 "{:0rwidth$}/{:0cwidth$}",
657 row,
658 col,
659 rwidth = max_row_width,
660 cwidth = max_col_width,
661 )
662 .unwrap();
663 let path = base.append(&buf);
664 res = match data.contains_key(path.as_bytes()) {
665 Ok(false) => {
666 let r = set_data(data, &mut pending, true, path, Value::Null);
667 merge_err(r, res)
668 }
669 Ok(true) => res,
670 Err(e) => Err(e.into()),
671 }
672 }
673 (pending, buf, res)
674 },
675 )
676 .map(|(u, _, r)| (u, r))
677 .reduce(
678 || (Update::new(), Ok(())),
679 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
680 );
681 pending.merge_from(up);
682 res
683}
684
685fn del_sheet_rows(
686 data: &sled::Tree,
687 pending: &mut Update,
688 base: Path,
689 rows: usize,
690) -> Result<()> {
691 use rayon::prelude::*;
692 let mut cs = SheetDescr::new(data, &base)?;
693 let len = cs.rows.len();
694 let rows = min(len, rows);
695 let (up, res) = cs
696 .rows
697 .par_drain(len - rows..len + 1)
698 .fold(
699 || (Update::new(), Ok(())),
700 |(mut pending, res), row| {
701 let res = merge_err(remove_subtree(data, &mut pending, row), res);
702 (pending, res)
703 },
704 )
705 .reduce(
706 || (Update::new(), Ok(())),
707 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
708 );
709 pending.merge_from(up);
710 res
711}
712
713fn create_table(
714 data: &sled::Tree,
715 locked: &sled::Tree,
716 pending: &mut Update,
717 base: Path,
718 rows: Vec<ArcStr>,
719 cols: Vec<ArcStr>,
720 lock: bool,
721) -> Result<()> {
722 use rayon::prelude::*;
723 let rows: Vec<String> =
724 rows.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
725 let cols: Vec<String> =
726 cols.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
727 let (up, res) = rows
728 .par_iter()
729 .map(|row| cols.par_iter().map(move |col| (row, col)))
730 .flatten()
731 .fold(
732 || (Update::new(), String::new(), Ok(())),
733 |(mut pending, mut buf, res), (row, col)| {
734 buf.clear();
735 write!(buf, "{}/{}", row, col).unwrap();
736 let path = base.append(buf.as_str());
737 let res = match data.contains_key(path.as_bytes()) {
738 Ok(false) => {
739 let r = set_data(data, &mut pending, true, path, Value::Null);
740 merge_err(r, res)
741 }
742 Ok(true) => res,
743 Err(e) => Err(e.into()),
744 };
745 (pending, buf, res)
746 },
747 )
748 .map(|(u, _, r)| (u, r))
749 .reduce(
750 || (Update::new(), Ok(())),
751 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
752 );
753 pending.merge_from(up);
754 if lock {
755 set_locked(locked, pending, base)?
756 }
757 res
758}
759
760fn table_rows(data: &sled::Tree, base: &Path) -> Result<GPooled<Vec<Path>>> {
761 let base_levels = Path::levels(&base);
762 let mut paths = PATHS.take();
763 for r in data.scan_prefix(base.as_bytes()).keys() {
764 let k = r?;
765 if let Ok(path) = str::from_utf8(&*k) {
766 if Path::is_parent(base, path) {
767 let mut row = path;
768 let mut level = Path::levels(row);
769 while level > base_levels + 1 {
770 row = Path::dirname(row).unwrap_or("/");
771 level -= 1;
772 }
773 match paths.last() {
774 None => paths.push(Path::from(ArcStr::from(row))),
775 Some(last) => {
776 if last.as_ref() != row {
777 paths.push(Path::from(ArcStr::from(row)));
778 }
779 }
780 }
781 }
782 }
783 }
784 Ok(paths)
785}
786
787fn add_table_columns(
788 data: &sled::Tree,
789 pending: &mut Update,
790 base: Path,
791 cols: Vec<ArcStr>,
792) -> Result<()> {
793 use rayon::prelude::*;
794 let cols: Vec<String> =
795 cols.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
796 let (up, res) = table_rows(data, &base)?
797 .par_drain(..)
798 .fold(
799 || (Update::new(), Ok(())),
800 |(mut pending, mut res), row| {
801 for col in &cols {
802 let path = row.append(col);
803 res = match data.contains_key(path.as_bytes()) {
804 Ok(false) => {
805 let r = set_data(data, &mut pending, true, path, Value::Null);
806 merge_err(r, res)
807 }
808 Ok(true) => res,
809 Err(e) => Err(e.into()),
810 }
811 }
812 (pending, res)
813 },
814 )
815 .reduce(
816 || (Update::new(), Ok(())),
817 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
818 );
819 pending.merge_from(up);
820 res
821}
822
823fn del_table_columns(
824 data: &sled::Tree,
825 pending: &mut Update,
826 base: Path,
827 cols: Vec<ArcStr>,
828) -> Result<()> {
829 use rayon::prelude::*;
830 let cols: Vec<String> =
831 cols.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
832 let (up, res) = table_rows(data, &base)?
833 .par_drain(..)
834 .fold(
835 || (Update::new(), Ok(())),
836 |(mut pending, mut res), row| {
837 for col in &cols {
838 let path = row.append(col);
839 res = merge_err(remove(data, &mut pending, path), res);
840 }
841 (pending, res)
842 },
843 )
844 .reduce(
845 || (Update::new(), Ok(())),
846 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
847 );
848 pending.merge_from(up);
849 res
850}
851
852fn add_table_rows(
853 data: &sled::Tree,
854 pending: &mut Update,
855 base: Path,
856 rows: Vec<ArcStr>,
857) -> Result<()> {
858 use rayon::prelude::*;
859 let rows: Vec<String> =
860 rows.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
861 let base_levels = Path::levels(&base);
862 let mut cols = HashSet::new();
863 for r in data.scan_prefix(base.as_bytes()).keys() {
864 let k = r?;
865 let path = str::from_utf8(&k)?;
866 if Path::is_parent(&base, path) {
867 if Path::levels(path) == base_levels + 2 {
868 let col = Path::basename(path).unwrap_or("");
869 cols.insert(k.subslice(k.len() - col.len(), col.len()));
870 }
871 }
872 }
873 let cols: HashSet<String> = cols
874 .into_iter()
875 .filter_map(|k| str::from_utf8(&k).ok().map(String::from))
876 .collect();
877 let (up, res) = rows
878 .into_par_iter()
879 .fold(
880 || (Update::new(), String::new(), Ok(())),
881 |(mut pending, mut buf, mut res), row| {
882 for col in &cols {
883 buf.clear();
884 write!(buf, "{}/{}", row, col).unwrap();
885 let path = base.append(&buf);
886 res = match data.contains_key(path.as_bytes()) {
887 Ok(false) => {
888 let r = set_data(data, &mut pending, true, path, Value::Null);
889 merge_err(r, res)
890 }
891 Ok(true) => res,
892 Err(e) => Err(e.into()),
893 }
894 }
895 (pending, buf, res)
896 },
897 )
898 .map(|(u, _, r)| (u, r))
899 .reduce(
900 || (Update::new(), Ok(())),
901 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
902 );
903 pending.merge_from(up);
904 res
905}
906
907fn del_table_rows(
908 data: &sled::Tree,
909 pending: &mut Update,
910 base: Path,
911 rows: Vec<ArcStr>,
912) -> Result<()> {
913 use rayon::prelude::*;
914 let rows: Vec<String> =
915 rows.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
916 let (up, res) = rows
917 .into_par_iter()
918 .fold(
919 || (Update::new(), Ok(())),
920 |(mut pending, res), row| {
921 let path = base.append(&row);
922 let res = merge_err(remove_subtree(data, &mut pending, path), res);
923 (pending, res)
924 },
925 )
926 .reduce(
927 || (Update::new(), Ok(())),
928 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
929 );
930 pending.merge_from(up);
931 res
932}
933
934fn is_locked(locked: &sled::Tree, path: &Path, parent_only: bool) -> Result<bool> {
935 let mut iter = if parent_only {
936 locked.range(..path.as_bytes())
937 } else {
938 locked.range(..=path.as_bytes())
939 };
940 loop {
941 match iter.next_back() {
942 None => break Ok(false),
943 Some(r) => {
944 let (k, v) = r?;
945 let k = str::from_utf8(&k)?;
946 if Path::is_parent(k, &path) {
947 break Ok(&*v == &[1u8]);
948 }
949 }
950 }
951 }
952}
953
954fn set_locked(locked: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
955 if is_locked(locked, &path, true)? {
956 locked.remove(path.as_bytes())?;
957 } else {
958 locked.insert(path.as_bytes(), &[1u8])?;
959 }
960 pending.locked.push(path);
961 Ok(())
962}
963
964fn set_unlocked(locked: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
965 if !is_locked(locked, &path, true)? {
966 locked.remove(path.as_bytes())?;
967 } else {
968 locked.insert(path.as_bytes(), &[0u8])?;
969 }
970 pending.unlocked.push(path);
971 Ok(())
972}
973
974fn remove_subtree(data: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
975 use rayon::prelude::*;
976 let mut paths = PATHS.take();
977 for res in data.scan_prefix(path.as_ref()).keys() {
978 let key = res?;
979 let key = str::from_utf8(&key)?;
980 if Path::is_parent(&path, &key) {
981 paths.push(Path::from(ArcStr::from(key)));
982 }
983 }
984 let (up, res) = paths
985 .par_drain(..)
986 .fold(
987 || (Update::new(), Ok(())),
988 |(mut pending, res), path| {
989 let res = merge_err(remove(data, &mut pending, path), res);
990 (pending, res)
991 },
992 )
993 .reduce(
994 || (Update::new(), Ok(())),
995 |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
996 );
997 pending.merge_from(up);
998 res
999}
1000
1001fn add_root(roots: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
1002 let key = path.as_bytes();
1003 if let Some(r) = roots.range(..key).next_back() {
1004 let (prev, _) = r?;
1005 let prev = str::from_utf8(&prev)?;
1006 if Path::is_parent(prev, &path) {
1007 bail!("a parent path is already a root")
1008 }
1009 }
1010 if !roots.contains_key(key)? {
1011 roots.insert(key, &[])?;
1012 pending.added_roots.push(path);
1013 }
1014 Ok(())
1015}
1016
1017fn del_root(
1018 data: &sled::Tree,
1019 roots: &sled::Tree,
1020 locked: &sled::Tree,
1021 pending: &mut Update,
1022 path: Path,
1023) -> Result<()> {
1024 let key = path.as_bytes();
1025 if roots.contains_key(key)? {
1026 let mut iter = roots.range(..key).keys();
1027 let remove = loop {
1028 match iter.next_back() {
1029 None => break false,
1030 Some(r) => {
1031 let k = r?;
1032 let k = str::from_utf8(&k)?;
1033 if Path::is_parent(k, &path) {
1034 break true;
1035 }
1036 }
1037 }
1038 };
1039 if remove {
1040 remove_subtree(data, pending, path.clone())?
1041 }
1042 for r in roots.scan_prefix(key).keys() {
1043 let k = r?;
1044 let k = str::from_utf8(&k)?;
1045 if Path::is_parent(&path, k) {
1046 roots.remove(&k)?;
1047 pending.removed_roots.push(Path::from(ArcStr::from(k)));
1048 }
1049 }
1050 for r in locked.scan_prefix(key).keys() {
1051 let k = r?;
1052 let k = str::from_utf8(&k)?;
1053 if Path::is_parent(&path, k) {
1054 locked.remove(&k)?;
1055 pending.unlocked.push(Path::from(ArcStr::from(k)));
1056 }
1057 }
1058 }
1059 Ok(())
1060}
1061
1062fn send_reply(reply: Reply, r: Result<()>) {
1063 match (r, reply) {
1064 (Ok(()), Some(reply)) => {
1065 reply.send(Value::Null);
1066 }
1067 (Err(e), Some(reply)) => {
1068 let e = Value::error(format!("{}", e));
1069 reply.send(e);
1070 }
1071 (_, None) => (),
1072 }
1073}
1074
1075fn commit_complex(
1076 data: &sled::Tree,
1077 locked: &sled::Tree,
1078 roots: &sled::Tree,
1079 mut txn: Txn,
1080) -> Update {
1081 let mut pending = Update::new();
1082 for (op, reply) in txn.0.drain(..) {
1083 let r = match op {
1084 TxnOp::CreateSheet { base, rows, cols, max_rows, max_columns, lock } => {
1085 create_sheet(
1086 &data,
1087 &locked,
1088 &mut pending,
1089 base,
1090 rows,
1091 cols,
1092 max_rows,
1093 max_columns,
1094 lock,
1095 )
1096 }
1097 TxnOp::AddSheetColumns { base, cols } => {
1098 add_sheet_columns(&data, &mut pending, base, cols)
1099 }
1100 TxnOp::AddSheetRows { base, rows } => {
1101 add_sheet_rows(&data, &mut pending, base, rows)
1102 }
1103 TxnOp::DelSheetColumns { base, cols } => {
1104 del_sheet_columns(&data, &mut pending, base, cols)
1105 }
1106 TxnOp::DelSheetRows { base, rows } => {
1107 del_sheet_rows(&data, &mut pending, base, rows)
1108 }
1109 TxnOp::CreateTable { base, rows, cols, lock } => {
1110 create_table(&data, &locked, &mut pending, base, rows, cols, lock)
1111 }
1112 TxnOp::AddTableColumns { base, cols } => {
1113 add_table_columns(&data, &mut pending, base, cols)
1114 }
1115 TxnOp::AddTableRows { base, rows } => {
1116 add_table_rows(&data, &mut pending, base, rows)
1117 }
1118 TxnOp::DelTableColumns { base, cols } => {
1119 del_table_columns(&data, &mut pending, base, cols)
1120 }
1121 TxnOp::DelTableRows { base, rows } => {
1122 del_table_rows(&data, &mut pending, base, rows)
1123 }
1124 TxnOp::Remove(path) => remove(&data, &mut pending, path),
1125 TxnOp::RemoveSubtree(path) => remove_subtree(&data, &mut pending, path),
1126 TxnOp::SetData(update, path, value) => {
1127 set_data(&data, &mut pending, update, path, value)
1128 }
1129 TxnOp::SetLocked(path) => set_locked(&locked, &mut pending, path),
1130 TxnOp::SetUnlocked(path) => set_unlocked(&locked, &mut pending, path),
1131 TxnOp::AddRoot(path) => add_root(&roots, &mut pending, path),
1132 TxnOp::DelRoot(path) => del_root(&data, &roots, &locked, &mut pending, path),
1133 TxnOp::Flush(finished) => {
1134 let _: Result<_, _> = data.flush();
1135 let _: Result<_, _> = locked.flush();
1136 let _: Result<_, _> = finished.send(());
1137 Ok(())
1138 }
1139 };
1140 send_reply(reply, r);
1141 }
1142 pending
1143}
1144
1145fn commit_simple(data: &sled::Tree, locked: &sled::Tree, mut txn: Txn) -> Update {
1146 use rayon::prelude::*;
1147 let mut by_path = BYPATH.take();
1148 for (op, reply) in txn.0.drain(..) {
1149 by_path.entry(op.path()).or_insert_with(|| STXNS.take()).push((op, reply));
1150 }
1151 by_path
1152 .par_drain()
1153 .fold(
1154 || Update::new(),
1155 |mut pending, (_, mut ops)| {
1156 for (op, reply) in ops.drain(..) {
1157 let r = match op {
1158 TxnOp::SetData(update, path, value) => {
1159 set_data(data, &mut pending, update, path, value)
1160 }
1161 TxnOp::Remove(path) => remove(data, &mut pending, path),
1162 TxnOp::SetLocked(path) => set_locked(locked, &mut pending, path),
1163 TxnOp::SetUnlocked(path) => {
1164 set_unlocked(locked, &mut pending, path)
1165 }
1166 TxnOp::CreateSheet { .. }
1167 | TxnOp::AddSheetColumns { .. }
1168 | TxnOp::AddSheetRows { .. }
1169 | TxnOp::DelSheetColumns { .. }
1170 | TxnOp::DelSheetRows { .. }
1171 | TxnOp::CreateTable { .. }
1172 | TxnOp::AddTableColumns { .. }
1173 | TxnOp::AddTableRows { .. }
1174 | TxnOp::DelTableColumns { .. }
1175 | TxnOp::DelTableRows { .. }
1176 | TxnOp::RemoveSubtree { .. }
1177 | TxnOp::AddRoot(_)
1178 | TxnOp::DelRoot(_)
1179 | TxnOp::Flush(_) => unreachable!(),
1180 };
1181 send_reply(reply, r)
1182 }
1183 pending
1184 },
1185 )
1186 .reduce(|| Update::new(), |u0, u1| u0.merge(u1))
1187}
1188
1189struct Stats {
1190 publisher: Publisher,
1191 busy: Val,
1192 queued: Val,
1193 queued_cnt: AtomicUsize,
1194 deleting: Val,
1195 to_commit: UnboundedSender<UpdateBatch>,
1196}
1197
1198async fn stats_commit_task(rx: UnboundedReceiver<UpdateBatch>) {
1199 let mut rx = Batched::new(rx, 1_000_000);
1200 let mut pending: Option<UpdateBatch> = None;
1201 while let Some(batch) = rx.next().await {
1202 match batch {
1203 BatchItem::InBatch(mut b) => match &mut pending {
1204 Some(pending) => {
1205 let _: Result<_> = pending.merge_from(&mut b);
1206 }
1207 None => {
1208 pending = Some(b);
1209 }
1210 },
1211 BatchItem::EndBatch => {
1212 if let Some(pending) = pending.take() {
1213 pending.commit(Some(Duration::from_secs(10))).await
1214 }
1215 }
1216 }
1217 }
1218}
1219
1220impl Stats {
1221 fn new(publisher: Publisher, base_path: Path) -> Result<Arc<Self>> {
1222 let busy = publisher.publish(base_path.append("busy"), false)?;
1223 let queued = publisher.publish(base_path.append("queued"), 0)?;
1224 let deleting = publisher.publish(base_path.append("background-delete"), false)?;
1225 let (tx, rx) = unbounded();
1226 task::spawn(stats_commit_task(rx));
1227 Ok(Arc::new(Stats {
1228 publisher,
1229 busy,
1230 queued,
1231 queued_cnt: AtomicUsize::new(0),
1232 deleting,
1233 to_commit: tx,
1234 }))
1235 }
1236
1237 fn inc_queued(&self) {
1238 let n = self.queued_cnt.fetch_add(1, Ordering::Relaxed) + 1;
1239 let mut batch = self.publisher.start_batch();
1240 self.queued.update(&mut batch, n);
1241 let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1242 }
1243
1244 fn dec_queued(&self) {
1245 let n = self.queued_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
1246 let mut batch = self.publisher.start_batch();
1247 self.queued.update(&mut batch, n);
1248 self.busy.update(&mut batch, true);
1249 let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1250 }
1251
1252 fn set_busy(&self, busy: bool) {
1253 let mut batch = self.publisher.start_batch();
1254 self.busy.update(&mut batch, busy);
1255 let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1256 }
1257
1258 fn set_deleting(&self, deleting: bool) {
1259 let mut batch = self.publisher.start_batch();
1260 self.deleting.update(&mut batch, deleting);
1261 let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1262 }
1263}
1264
1265async fn background_delete_task(data: sled::Tree, stats: Option<Arc<Stats>>) {
1266 if let Some(stats) = &stats {
1267 stats.set_deleting(true);
1268 }
1269 task::block_in_place(|| {
1270 for r in data.iter() {
1271 if let Ok((k, v)) = r {
1272 if let DatumKind::Deleted = DatumKind::decode(&mut &*v) {
1273 let _: Result<_, _> =
1275 data.compare_and_swap(k, Some(v), None::<sled::IVec>);
1276 }
1277 }
1278 }
1279 });
1280 if let Some(stats) = &stats {
1281 stats.set_deleting(false);
1282 }
1283}
1284
1285async fn commit_txns_task(
1286 stats: Option<Arc<Stats>>,
1287 data: sled::Tree,
1288 locked: sled::Tree,
1289 roots: sled::Tree,
1290 incoming: UnboundedReceiver<Txn>,
1291 outgoing: UnboundedSender<Update>,
1292) {
1293 let mut incoming = incoming.fuse();
1294 async fn wait_delete_task(jh: &mut Option<task::JoinHandle<()>>) {
1295 match jh {
1296 Some(jh) => {
1297 let _: Result<_, _> = jh.await;
1299 }
1300 None => future::pending().await,
1301 }
1302 }
1303 let mut delete_task: Option<task::JoinHandle<()>> = None;
1304 let mut delete_required = false;
1305 loop {
1306 select_biased! {
1307 () = wait_delete_task(&mut delete_task).fuse() => {
1308 delete_task = None;
1309 }
1310 txn = incoming.select_next_some() => {
1311 if let Some(stats) = &stats {
1312 stats.dec_queued();
1313 }
1314 let (simple, delete) =
1315 txn.0.iter().fold((true, false), |(simple, delete), op| match &op.0 {
1316 TxnOp::CreateSheet { .. }
1317 | TxnOp::AddSheetColumns { .. }
1318 | TxnOp::AddSheetRows { .. }
1319 | TxnOp::CreateTable { .. }
1320 | TxnOp::AddTableColumns { .. }
1321 | TxnOp::AddTableRows { .. }
1322 | TxnOp::AddRoot(_)
1323 | TxnOp::Flush(_) => (false, delete),
1324 TxnOp::RemoveSubtree { .. }
1325 | TxnOp::DelTableColumns { .. }
1326 | TxnOp::DelTableRows { .. }
1327 | TxnOp::DelSheetColumns { .. }
1328 | TxnOp::DelSheetRows { .. }
1329 | TxnOp::DelRoot(_) => (false, true),
1330 TxnOp::Remove(_) => (simple, true),
1331 TxnOp::SetData(_, _, _)
1332 | TxnOp::SetLocked(_)
1333 | TxnOp::SetUnlocked(_) => (simple, delete),
1334 });
1335 delete_required |= delete;
1336 let pending = if simple {
1337 task::block_in_place(|| commit_simple(&data, &locked, txn))
1338 } else {
1339 task::block_in_place(|| commit_complex(&data, &locked, &roots, txn))
1340 };
1341 if let Some(stats) = &stats {
1342 stats.set_busy(false);
1343 }
1344 match outgoing.unbounded_send(pending) {
1345 Ok(()) => (),
1346 Err(_) => break,
1347 }
1348 if delete_required && delete_task.is_none() {
1349 let job = background_delete_task(data.clone(), stats.clone());
1350 delete_required = false;
1351 delete_task = Some(task::spawn(job));
1352 }
1353 }
1354 complete => break,
1355 }
1356 }
1357}
1358
1359#[derive(Clone)]
1360pub struct Db {
1361 db: sled::Db,
1362 data: sled::Tree,
1363 locked: sled::Tree,
1364 roots: sled::Tree,
1365 submit_txn: UnboundedSender<Txn>,
1366 stats: Option<Arc<Stats>>,
1367}
1368
1369impl Db {
1370 pub(super) fn new(
1371 cfg: &Params,
1372 publisher: Publisher,
1373 base_path: Option<Path>,
1374 ) -> Result<(Self, UnboundedReceiver<Update>)> {
1375 let stats = match base_path {
1376 None => None,
1377 Some(p) => Some(Stats::new(publisher, p)?),
1378 };
1379 let path = cfg
1380 .db
1381 .as_ref()
1382 .map(PathBuf::from)
1383 .or_else(Params::default_db_path)
1384 .ok_or_else(|| {
1385 anyhow!("db dir not specified and no default could be determined")
1386 })?;
1387 let db = sled::Config::default()
1388 .cache_capacity(cfg.cache_size.unwrap_or(16 * 1024 * 1024))
1389 .path(&path)
1390 .open()?;
1391 let data = db.open_tree("data")?;
1392 let locked = db.open_tree("locked")?;
1393 let roots = db.open_tree("roots")?;
1394 let (tx_incoming, rx_incoming) = unbounded();
1395 let (tx_outgoing, rx_outgoing) = unbounded();
1396 task::spawn(commit_txns_task(
1397 stats.clone(),
1398 data.clone(),
1399 locked.clone(),
1400 roots.clone(),
1401 rx_incoming,
1402 tx_outgoing,
1403 ));
1404 Ok((Db { db, data, locked, roots, submit_txn: tx_incoming, stats }, rx_outgoing))
1405 }
1406
1407 pub fn open_tree(&self, name: &str) -> Result<sled::Tree> {
1408 if name == "data" || name == "locked" || name == "roots" {
1409 bail!("tree name reserved")
1410 }
1411 Ok(self.db.open_tree(name)?)
1412 }
1413
1414 pub(super) fn commit(&self, txn: Txn) {
1415 if let Some(stats) = &self.stats {
1416 stats.inc_queued();
1417 }
1418 let _: Result<_, _> = self.submit_txn.unbounded_send(txn);
1419 }
1420
1421 pub(super) async fn flush_async(&self) -> Result<()> {
1422 let (tx, rx) = oneshot::channel();
1423 let mut txn = Txn::new();
1424 txn.0.push((TxnOp::Flush(tx), None));
1425 self.commit(txn);
1426 let _: Result<_, _> = rx.await;
1427 Ok(())
1428 }
1429
1430 pub fn relative_column(&self, base: &Path, offset: i32) -> Result<Option<Path>> {
1431 use std::ops::Bound::{self, *};
1432 let rowbase = Path::dirname(base).ok_or_else(|| anyhow!("no row"))?;
1433 let mut i = 0;
1434 if offset == 0 {
1435 Ok(Some(base.clone()))
1436 } else if offset < 0 {
1437 let mut iter = self
1438 .data
1439 .range::<&str, (Bound<&str>, Bound<&str>)>((Unbounded, Excluded(&**base)))
1440 .keys();
1441 while let Some(r) = iter.next_back() {
1442 let r = r?;
1443 let path = str::from_utf8(&r)?;
1444 if !Path::is_parent(&base, path) {
1445 return Ok(None);
1446 } else if Path::dirname(path) == Some(rowbase) {
1447 i -= 1;
1448 if i == offset {
1449 return Ok(Some(Path::from(ArcStr::from(path))));
1450 }
1451 } else {
1452 return Ok(None);
1453 }
1454 }
1455 Ok(None)
1456 } else {
1457 let iter = self
1458 .data
1459 .range::<&str, (Bound<&str>, Bound<&str>)>((Excluded(&**base), Unbounded))
1460 .keys();
1461 for r in iter {
1462 let r = r?;
1463 let path = str::from_utf8(&r)?;
1464 if !Path::is_parent(&base, path) {
1465 return Ok(None);
1466 } else if Path::dirname(path) == Some(rowbase) {
1467 i += 1;
1468 if i == offset {
1469 return Ok(Some(Path::from(ArcStr::from(path))));
1470 }
1471 } else {
1472 return Ok(None);
1473 }
1474 }
1475 Ok(None)
1476 }
1477 }
1478
1479 pub fn relative_row(&self, base: &Path, offset: i32) -> Result<Option<Path>> {
1480 use std::ops::Bound::{self, *};
1481 macro_rules! or_none {
1482 ($e:expr) => {
1483 match $e {
1484 Some(p) => p,
1485 None => return Ok(None),
1486 }
1487 };
1488 }
1489 let column = or_none!(Path::basename(base));
1490 let mut rowbase = ArcStr::from(or_none!(Path::dirname(base)));
1491 let tablebase = ArcStr::from(or_none!(Path::dirname(rowbase.as_str())));
1492 let mut i = 0;
1493 if offset == 0 {
1494 Ok(Some(base.clone()))
1495 } else if offset < 0 {
1496 let mut iter = self
1497 .data
1498 .range::<&str, (Bound<&str>, Bound<&str>)>((Unbounded, Excluded(&**base)))
1499 .keys();
1500 while let Some(r) = iter.next_back() {
1501 let r = r?;
1502 let path = str::from_utf8(&r)?;
1503 let path_column = or_none!(Path::basename(path));
1504 let path_row = or_none!(Path::dirname(path));
1505 let path_table = or_none!(Path::dirname(path_row));
1506 if !Path::is_parent(&base, path) {
1507 return Ok(None);
1508 } else if path_table == tablebase {
1509 if path_row != rowbase.as_str() {
1510 i -= 1;
1511 rowbase = ArcStr::from(path_row);
1512 }
1513 if i == offset && path_column == column {
1514 return Ok(Some(Path::from(ArcStr::from(path))));
1515 }
1516 if i < offset {
1517 return Ok(None);
1518 }
1519 } else {
1520 return Ok(None);
1521 }
1522 }
1523 Ok(None)
1524 } else {
1525 let iter = self
1526 .data
1527 .range::<&str, (Bound<&str>, Bound<&str>)>((Excluded(&**base), Unbounded))
1528 .keys();
1529 for r in iter {
1530 let r = r?;
1531 let path = str::from_utf8(&r)?;
1532 let path_column = or_none!(Path::basename(path));
1533 let path_row = or_none!(Path::dirname(path));
1534 let path_table = or_none!(Path::dirname(path_row));
1535 if !Path::is_parent(&base, path) {
1536 return Ok(None);
1537 } else if path_table == tablebase {
1538 if path_row != rowbase.as_str() {
1539 i += 1;
1540 rowbase = ArcStr::from(path_row);
1541 }
1542 if i == offset && path_column == column {
1543 return Ok(Some(Path::from(ArcStr::from(path))));
1544 }
1545 if i > offset {
1546 return Ok(None);
1547 }
1548 } else {
1549 return Ok(None);
1550 }
1551 }
1552 Ok(None)
1553 }
1554 }
1555
1556 pub fn lookup<P: AsRef<[u8]>>(&self, path: P) -> Result<Option<Datum>> {
1557 lookup_value(&self.data, path)
1558 }
1559
1560 pub fn lookup_value<P: AsRef<[u8]>>(&self, path: P) -> Option<Value> {
1561 self.lookup(path).ok().flatten().and_then(|d| match d {
1562 Datum::Deleted | Datum::Formula(_, _) => None,
1563 Datum::Data(v) => Some(v),
1564 })
1565 }
1566
1567 fn decode_iter(
1568 iter: sled::Iter,
1569 ) -> impl Iterator<Item = Result<(Path, DatumKind, sled::IVec)>> + 'static {
1570 iter.map(|res| {
1571 let (key, val) = res?;
1572 let path = Path::from(ArcStr::from(str::from_utf8(&key)?));
1573 let value = DatumKind::decode(&mut &*val);
1574 Ok((path, value, val))
1575 })
1576 }
1577
1578 pub fn iter(
1579 &self,
1580 ) -> impl Iterator<Item = Result<(Path, DatumKind, sled::IVec)>> + 'static {
1581 Self::decode_iter(self.data.iter())
1582 }
1583
1584 pub fn iter_prefix(
1585 &self,
1586 prefix: Path,
1587 ) -> impl Iterator<Item = Result<(Path, DatumKind, sled::IVec)>> + 'static {
1588 Self::decode_iter(self.data.scan_prefix(&*prefix))
1589 }
1590
1591 pub fn prefix_len(&self, prefix: &Path) -> usize {
1593 self.data.scan_prefix(&**prefix).count()
1594 }
1595
1596 pub fn locked(&self) -> impl Iterator<Item = Result<(Path, bool)>> + 'static {
1597 self.locked.iter().map(|r| {
1598 let (k, v) = r?;
1599 let path = Path::from(ArcStr::from(str::from_utf8(&k)?));
1600 let locked = &*v == &[1u8];
1601 Ok((path, locked))
1602 })
1603 }
1604
1605 pub fn roots(&self) -> impl Iterator<Item = Result<Path>> + 'static {
1606 iter_paths(&self.roots)
1607 }
1608
1609 pub fn clear(&self) -> Result<()> {
1610 self.db.clear()?;
1611 self.data.clear()?;
1612 self.locked.clear()?;
1613 Ok(self.roots.clear()?)
1614 }
1615}