netidx_container/
db.rs

1use super::Params;
2use anyhow::{anyhow, bail, Result};
3use arcstr::ArcStr;
4use bytes::{Buf, BufMut};
5use futures::{
6    channel::{
7        mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
8        oneshot,
9    },
10    prelude::*,
11    select_biased,
12};
13use netidx::{
14    pack::{Pack, PackError},
15    path::Path,
16    publisher::{Publisher, SendResult, UpdateBatch, Val},
17    subscriber::Value,
18    utils::{BatchItem, Batched},
19};
20use netidx_protocols::rpc::server::RpcReply;
21use parking_lot::Mutex;
22use poolshark::global::{GPooled, Pool};
23use sled;
24use std::{
25    cmp::{max, min},
26    collections::{HashMap, HashSet},
27    fmt::Write,
28    path::PathBuf,
29    str,
30    sync::{
31        atomic::{AtomicUsize, Ordering},
32        LazyLock,
33    },
34    time::Duration,
35};
36use tokio::task;
37use triomphe::Arc;
38
39pub enum Sendable {
40    Packed(Arc<Mutex<Value>>),
41    Rpc(RpcReply),
42    Write(SendResult),
43}
44
45impl Sendable {
46    pub fn send(self, v: Value) {
47        match self {
48            Sendable::Packed(res) => {
49                let mut res = res.lock();
50                match &*res {
51                    Value::Error(_) => (),
52                    _ => {
53                        *res = v;
54                    }
55                }
56            }
57            Sendable::Rpc(mut reply) => {
58                reply.send(v);
59            }
60            Sendable::Write(reply) => {
61                reply.send(v);
62            }
63        }
64    }
65}
66
67pub type Reply = Option<Sendable>;
68type Txns = Vec<(TxnOp, Reply)>;
69
70static BUF: LazyLock<Pool<Vec<u8>>> = LazyLock::new(|| Pool::new(8, 16384));
71static PDPAIR: LazyLock<Pool<Vec<(Path, UpdateKind)>>> =
72    LazyLock::new(|| Pool::new(256, 8124));
73static PATHS: LazyLock<Pool<Vec<Path>>> = LazyLock::new(|| Pool::new(256, 65534));
74static TXNS: LazyLock<Pool<Txns>> = LazyLock::new(|| Pool::new(16, 65534));
75static STXNS: LazyLock<Pool<Txns>> = LazyLock::new(|| Pool::new(65534, 32));
76static BYPATH: LazyLock<Pool<HashMap<Path, GPooled<Txns>>>> =
77    LazyLock::new(|| Pool::new(16, 65534));
78
79pub(super) enum UpdateKind {
80    Deleted,
81    Inserted(Value),
82    Updated(Value),
83}
84
85pub(super) struct Update {
86    pub(super) data: GPooled<Vec<(Path, UpdateKind)>>,
87    pub(super) locked: GPooled<Vec<Path>>,
88    pub(super) unlocked: GPooled<Vec<Path>>,
89    pub(super) added_roots: GPooled<Vec<Path>>,
90    pub(super) removed_roots: GPooled<Vec<Path>>,
91}
92
93impl Update {
94    fn new() -> Update {
95        Update {
96            data: PDPAIR.take(),
97            locked: PATHS.take(),
98            unlocked: PATHS.take(),
99            added_roots: PATHS.take(),
100            removed_roots: PATHS.take(),
101        }
102    }
103
104    fn merge_from(&mut self, mut other: Update) {
105        self.data.extend(other.data.drain(..));
106        self.locked.extend(other.locked.drain(..));
107        self.unlocked.extend(other.unlocked.drain(..));
108        self.added_roots.extend(other.added_roots.drain(..));
109        self.removed_roots.extend(other.removed_roots.drain(..));
110    }
111
112    fn merge(mut self, other: Update) -> Update {
113        self.merge_from(other);
114        self
115    }
116}
117
118pub enum DatumKind {
119    Data,
120    Formula,
121    Deleted,
122    Invalid,
123}
124
125impl DatumKind {
126    fn decode(buf: &mut impl Buf) -> DatumKind {
127        match buf.get_u8() {
128            0 => DatumKind::Data,
129            1 => DatumKind::Formula,
130            2 => DatumKind::Deleted,
131            _ => DatumKind::Invalid,
132        }
133    }
134}
135
136#[derive(Debug, Clone)]
137pub enum Datum {
138    Data(Value),
139    Formula(Value, Value),
140    Deleted,
141}
142
143impl Pack for Datum {
144    fn encoded_len(&self) -> usize {
145        1 + match self {
146            Datum::Data(v) => v.encoded_len(),
147            Datum::Formula(f, w) => f.encoded_len() + w.encoded_len(),
148            Datum::Deleted => 0,
149        }
150    }
151
152    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
153        match self {
154            Datum::Data(v) => {
155                buf.put_u8(0);
156                Pack::encode(v, buf)
157            }
158            Datum::Formula(f, w) => {
159                buf.put_u8(1);
160                Pack::encode(f, buf)?;
161                Pack::encode(w, buf)
162            }
163            Datum::Deleted => Ok(buf.put_u8(2)),
164        }
165    }
166
167    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
168        if buf.remaining() == 0 {
169            Err(PackError::InvalidFormat)
170        } else {
171            match buf.get_u8() {
172                0 => Ok(Datum::Data(Value::decode(buf)?)),
173                1 => {
174                    let f = Value::decode(buf)?;
175                    let w = Value::decode(buf)?;
176                    Ok(Datum::Formula(f, w))
177                }
178                2 => Ok(Datum::Deleted),
179                _ => Err(PackError::UnknownTag),
180            }
181        }
182    }
183}
184
185fn lookup_value<P: AsRef<[u8]>>(tree: &sled::Tree, path: P) -> Result<Option<Datum>> {
186    match tree.get(path.as_ref())? {
187        None => Ok(None),
188        Some(v) => Ok(Some(Datum::decode(&mut &*v)?)),
189    }
190}
191
192fn iter_paths(tree: &sled::Tree) -> impl Iterator<Item = Result<Path>> + 'static {
193    tree.iter().keys().map(|res| Ok(Path::from(ArcStr::from(str::from_utf8(&res?)?))))
194}
195
196enum TxnOp {
197    Remove(Path),
198    SetData(bool, Path, Value),
199    CreateSheet {
200        base: Path,
201        rows: usize,
202        max_rows: usize,
203        max_columns: usize,
204        cols: usize,
205        lock: bool,
206    },
207    AddSheetColumns {
208        base: Path,
209        cols: usize,
210    },
211    AddSheetRows {
212        base: Path,
213        rows: usize,
214    },
215    DelSheetColumns {
216        base: Path,
217        cols: usize,
218    },
219    DelSheetRows {
220        base: Path,
221        rows: usize,
222    },
223    CreateTable {
224        base: Path,
225        rows: Vec<ArcStr>,
226        cols: Vec<ArcStr>,
227        lock: bool,
228    },
229    AddTableColumns {
230        base: Path,
231        cols: Vec<ArcStr>,
232    },
233    AddTableRows {
234        base: Path,
235        rows: Vec<ArcStr>,
236    },
237    DelTableColumns {
238        base: Path,
239        cols: Vec<ArcStr>,
240    },
241    DelTableRows {
242        base: Path,
243        rows: Vec<ArcStr>,
244    },
245    SetLocked(Path),
246    SetUnlocked(Path),
247    AddRoot(Path),
248    DelRoot(Path),
249    RemoveSubtree(Path),
250    Flush(oneshot::Sender<()>),
251}
252
253impl TxnOp {
254    fn path(&self) -> Path {
255        use TxnOp::*;
256        match self {
257            Remove(p) => p.clone(),
258            SetData(_, p, _) => p.clone(),
259            CreateSheet { base, .. } => base.clone(),
260            AddSheetColumns { base, .. } => base.clone(),
261            AddSheetRows { base, .. } => base.clone(),
262            DelSheetColumns { base, .. } => base.clone(),
263            DelSheetRows { base, .. } => base.clone(),
264            CreateTable { base, .. } => base.clone(),
265            AddTableColumns { base, .. } => base.clone(),
266            AddTableRows { base, .. } => base.clone(),
267            DelTableColumns { base, .. } => base.clone(),
268            DelTableRows { base, .. } => base.clone(),
269            SetLocked(p) => p.clone(),
270            SetUnlocked(p) => p.clone(),
271            RemoveSubtree(p) => p.clone(),
272            AddRoot(p) => p.clone(),
273            DelRoot(p) => p.clone(),
274            Flush(_) => Path::root(),
275        }
276    }
277}
278
279pub struct Txn(GPooled<Txns>);
280
281impl Txn {
282    pub fn new() -> Self {
283        Self(TXNS.take())
284    }
285
286    pub fn dirty(&self) -> bool {
287        self.0.len() > 0
288    }
289
290    pub fn remove(&mut self, path: Path, reply: Reply) {
291        self.0.push((TxnOp::Remove(path), reply))
292    }
293
294    pub fn set_data(&mut self, update: bool, path: Path, value: Value, reply: Reply) {
295        self.0.push((TxnOp::SetData(update, path, value), reply))
296    }
297
298    pub fn create_sheet(
299        &mut self,
300        base: Path,
301        rows: usize,
302        cols: usize,
303        max_rows: usize,
304        max_columns: usize,
305        lock: bool,
306        reply: Reply,
307    ) {
308        self.0.push((
309            TxnOp::CreateSheet { base, rows, cols, max_rows, max_columns, lock },
310            reply,
311        ))
312    }
313
314    pub fn add_sheet_columns(&mut self, base: Path, cols: usize, reply: Reply) {
315        self.0.push((TxnOp::AddSheetColumns { base, cols }, reply))
316    }
317
318    pub fn add_sheet_rows(&mut self, base: Path, rows: usize, reply: Reply) {
319        self.0.push((TxnOp::AddSheetRows { base, rows }, reply))
320    }
321
322    pub fn del_sheet_columns(&mut self, base: Path, cols: usize, reply: Reply) {
323        self.0.push((TxnOp::DelSheetColumns { base, cols }, reply))
324    }
325
326    pub fn del_sheet_rows(&mut self, base: Path, rows: usize, reply: Reply) {
327        self.0.push((TxnOp::DelSheetRows { base, rows }, reply))
328    }
329
330    pub fn create_table(
331        &mut self,
332        base: Path,
333        rows: Vec<ArcStr>,
334        cols: Vec<ArcStr>,
335        lock: bool,
336        reply: Reply,
337    ) {
338        self.0.push((TxnOp::CreateTable { base, rows, cols, lock }, reply))
339    }
340
341    pub fn add_table_columns(&mut self, base: Path, cols: Vec<ArcStr>, reply: Reply) {
342        self.0.push((TxnOp::AddTableColumns { base, cols }, reply))
343    }
344
345    pub fn add_table_rows(&mut self, base: Path, rows: Vec<ArcStr>, reply: Reply) {
346        self.0.push((TxnOp::AddTableRows { base, rows }, reply))
347    }
348
349    pub fn del_table_columns(&mut self, base: Path, cols: Vec<ArcStr>, reply: Reply) {
350        self.0.push((TxnOp::DelTableColumns { base, cols }, reply))
351    }
352
353    pub fn del_table_rows(&mut self, base: Path, rows: Vec<ArcStr>, reply: Reply) {
354        self.0.push((TxnOp::DelTableRows { base, rows }, reply))
355    }
356
357    pub fn set_locked(&mut self, path: Path, reply: Reply) {
358        self.0.push((TxnOp::SetLocked(path), reply))
359    }
360
361    pub fn set_unlocked(&mut self, path: Path, reply: Reply) {
362        self.0.push((TxnOp::SetUnlocked(path), reply))
363    }
364
365    pub fn add_root(&mut self, path: Path, reply: Reply) {
366        self.0.push((TxnOp::AddRoot(path), reply));
367    }
368
369    pub fn del_root(&mut self, path: Path, reply: Reply) {
370        self.0.push((TxnOp::DelRoot(path), reply));
371    }
372
373    pub fn remove_subtree(&mut self, path: Path, reply: Reply) {
374        self.0.push((TxnOp::RemoveSubtree(path), reply))
375    }
376}
377
378fn remove(data: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
379    let key = path.as_bytes();
380    let mut val = BUF.take();
381    Datum::Deleted.encode(&mut *val)?;
382    if let Some(data) = data.insert(key, &**val)? {
383        match DatumKind::decode(&mut &*data) {
384            DatumKind::Data | DatumKind::Formula => {
385                pending.data.push((path, UpdateKind::Deleted))
386            }
387            DatumKind::Deleted | DatumKind::Invalid => (),
388        }
389    }
390    Ok(())
391}
392
393fn set_data(
394    data: &sled::Tree,
395    pending: &mut Update,
396    update: bool,
397    path: Path,
398    value: Value,
399) -> Result<()> {
400    let key = path.as_bytes();
401    let mut val = BUF.take();
402    let datum = Datum::Data(value.clone());
403    datum.encode(&mut *val)?;
404    let up = match data.insert(key, &**val)? {
405        None => UpdateKind::Inserted(value),
406        Some(data) => match DatumKind::decode(&mut &*data) {
407            DatumKind::Data => UpdateKind::Updated(value),
408            DatumKind::Formula | DatumKind::Deleted | DatumKind::Invalid => {
409                UpdateKind::Inserted(value)
410            }
411        },
412    };
413    if update {
414        pending.data.push((path, up));
415    }
416    Ok(())
417}
418
419fn merge_err(e0: Result<()>, e1: Result<()>) -> Result<()> {
420    match (e0, e1) {
421        (Ok(()), Ok(())) => Ok(()),
422        (Err(e), Err(_)) => Err(e),
423        (Err(e), Ok(())) => Err(e),
424        (Ok(()), Err(e)) => Err(e),
425    }
426}
427
428fn create_sheet(
429    data: &sled::Tree,
430    locked: &sled::Tree,
431    pending: &mut Update,
432    base: Path,
433    rows: usize,
434    cols: usize,
435    max_rows: usize,
436    max_columns: usize,
437    lock: bool,
438) -> Result<()> {
439    use rayon::prelude::*;
440    let rd = 1 + (max(rows, max_rows) as f32).log10() as usize;
441    let cd = 1 + (max(cols, max_columns) as f32).log10() as usize;
442    let (up, res) = (0..rows)
443        .into_par_iter()
444        .map(|row| (0..cols).into_par_iter().map(move |col| (row, col)))
445        .flatten()
446        .fold(
447            || (Update::new(), String::new(), Ok(())),
448            |(mut pending, mut buf, res), (i, j)| {
449                buf.clear();
450                write!(buf, "{:0rwidth$}/{:0cwidth$}", i, j, rwidth = rd, cwidth = cd)
451                    .unwrap();
452                let path = base.append(buf.as_str());
453                let res = match data.contains_key(path.as_bytes()) {
454                    Ok(false) => {
455                        let r = set_data(data, &mut pending, true, path, Value::Null);
456                        merge_err(r, res)
457                    }
458                    Ok(true) => res,
459                    Err(e) => Err(e.into()),
460                };
461                (pending, buf, res)
462            },
463        )
464        .map(|(u, _, r)| (u, r))
465        .reduce(
466            || (Update::new(), Ok(())),
467            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
468        );
469    pending.merge_from(up);
470    if lock {
471        set_locked(locked, pending, base)?
472    }
473    res
474}
475
476struct SheetDescr {
477    rows: GPooled<Vec<Path>>,
478    max_col: usize,
479    max_col_width: usize,
480}
481
482impl SheetDescr {
483    fn new(data: &sled::Tree, base: &Path) -> Result<Self> {
484        let base_levels = Path::levels(base);
485        let mut rows = PATHS.take();
486        let mut max_col = 0;
487        let mut max_col_width = 0;
488        for r in data.scan_prefix(base.as_bytes()).keys() {
489            if let Ok(k) = r {
490                if let Ok(path) = str::from_utf8(&*k) {
491                    if Path::is_parent(base, path) {
492                        let mut row = path;
493                        let mut level = Path::levels(row);
494                        loop {
495                            if level == base_levels + 1 {
496                                break;
497                            } else if level == base_levels + 2 {
498                                if let Some(col) = Path::basename(row) {
499                                    if let Ok(c) = col.parse::<usize>() {
500                                        max_col_width = max(max_col_width, col.len());
501                                        max_col = max(c, max_col);
502                                    }
503                                }
504                            }
505                            row = Path::dirname(row).unwrap_or("/");
506                            level -= 1;
507                        }
508                        match rows.last() {
509                            None => rows.push(Path::from(ArcStr::from(row))),
510                            Some(last) => {
511                                if last.as_ref() != row {
512                                    rows.push(Path::from(ArcStr::from(row)));
513                                }
514                            }
515                        }
516                    }
517                }
518            }
519        }
520        Ok(Self { rows, max_col, max_col_width })
521    }
522}
523
524fn add_sheet_columns(
525    data: &sled::Tree,
526    pending: &mut Update,
527    base: Path,
528    cols: usize,
529) -> Result<()> {
530    use rayon::prelude::*;
531    let mut cs = SheetDescr::new(data, &base)?;
532    let max_col = cs.max_col;
533    let max_col_width = cs.max_col_width;
534    if cs.max_col_width < 1 + ((cs.max_col + cols + 1) as f32).log10() as usize {
535        bail!("columns full")
536    }
537    let (up, res) = cs
538        .rows
539        .par_drain(..)
540        .fold(
541            || (Update::new(), String::new(), Ok(())),
542            |(mut pending, mut buf, mut res), row| {
543                for i in 1..cols + 1 {
544                    let col = max_col + i;
545                    buf.clear();
546                    write!(buf, "{}/{:0cwidth$}", row, col, cwidth = max_col_width)
547                        .unwrap();
548                    let path = Path::from(ArcStr::from(buf.as_str()));
549                    res = match data.contains_key(path.as_bytes()) {
550                        Ok(false) => {
551                            let r = set_data(data, &mut pending, true, path, Value::Null);
552                            merge_err(r, res)
553                        }
554                        Ok(true) => res,
555                        Err(e) => Err(e.into()),
556                    }
557                }
558                (pending, buf, res)
559            },
560        )
561        .map(|(u, _, r)| (u, r))
562        .reduce(
563            || (Update::new(), Ok(())),
564            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
565        );
566    pending.merge_from(up);
567    res
568}
569
570fn del_sheet_columns(
571    data: &sled::Tree,
572    pending: &mut Update,
573    base: Path,
574    cols: usize,
575) -> Result<()> {
576    use rayon::prelude::*;
577    let mut cs = SheetDescr::new(data, &base)?;
578    let max_col = cs.max_col;
579    let max_col_width = cs.max_col_width;
580    let cols = min(cs.max_col, cols);
581    let (up, res) = cs
582        .rows
583        .par_drain(..)
584        .fold(
585            || (Update::new(), String::new(), Ok(())),
586            |(mut pending, mut buf, mut res), row| {
587                for col in (max_col - cols..max_col + 1).rev() {
588                    buf.clear();
589                    write!(buf, "{}/{:0cw$}", row, col, cw = max_col_width).unwrap();
590                    let path = Path::from(ArcStr::from(buf.as_str()));
591                    res = merge_err(remove(data, &mut pending, path), res);
592                }
593                (pending, buf, res)
594            },
595        )
596        .map(|(u, _, r)| (u, r))
597        .reduce(
598            || (Update::new(), Ok(())),
599            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
600        );
601    pending.merge_from(up);
602    res
603}
604
605fn add_sheet_rows(
606    data: &sled::Tree,
607    pending: &mut Update,
608    base: Path,
609    rows: usize,
610) -> Result<()> {
611    use rayon::prelude::*;
612    let base_levels = Path::levels(&base);
613    let mut max_row = 0;
614    let mut max_col = 0;
615    let mut max_row_width = 0;
616    let mut max_col_width = 0;
617    let mut cols = HashSet::new();
618    for r in data.scan_prefix(base.as_bytes()).keys() {
619        let k = r?;
620        let path = str::from_utf8(&k)?;
621        if Path::is_parent(&base, path) {
622            if Path::levels(path) == base_levels + 2 {
623                if let Some(row) = Path::dirname(path) {
624                    if let Some(row) = Path::basename(row) {
625                        if let Ok(i) = row.parse::<usize>() {
626                            max_row = max(i, max_row);
627                            max_row_width = max(row.len(), max_row_width);
628                        }
629                    }
630                }
631                let col = Path::basename(path)
632                    .and_then(|p| p.parse::<usize>().ok().map(move |i| (p, i)))
633                    .map(|(col, i)| {
634                        max_col = max(i, max_col);
635                        max_col_width = max(col.len(), max_col_width);
636                        i
637                    });
638                if let Some(col) = col {
639                    cols.insert(col);
640                }
641            }
642        }
643    }
644    if max_row_width < 1 + ((max_row + rows + 1) as f32).log10() as usize {
645        bail!("sheet is full")
646    }
647    let (up, res) = (max_row + 1..max_row + rows)
648        .into_par_iter()
649        .fold(
650            || (Update::new(), String::new(), Ok(())),
651            |(mut pending, mut buf, mut res), row| {
652                for col in &cols {
653                    buf.clear();
654                    write!(
655                        buf,
656                        "{:0rwidth$}/{:0cwidth$}",
657                        row,
658                        col,
659                        rwidth = max_row_width,
660                        cwidth = max_col_width,
661                    )
662                    .unwrap();
663                    let path = base.append(&buf);
664                    res = match data.contains_key(path.as_bytes()) {
665                        Ok(false) => {
666                            let r = set_data(data, &mut pending, true, path, Value::Null);
667                            merge_err(r, res)
668                        }
669                        Ok(true) => res,
670                        Err(e) => Err(e.into()),
671                    }
672                }
673                (pending, buf, res)
674            },
675        )
676        .map(|(u, _, r)| (u, r))
677        .reduce(
678            || (Update::new(), Ok(())),
679            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
680        );
681    pending.merge_from(up);
682    res
683}
684
685fn del_sheet_rows(
686    data: &sled::Tree,
687    pending: &mut Update,
688    base: Path,
689    rows: usize,
690) -> Result<()> {
691    use rayon::prelude::*;
692    let mut cs = SheetDescr::new(data, &base)?;
693    let len = cs.rows.len();
694    let rows = min(len, rows);
695    let (up, res) = cs
696        .rows
697        .par_drain(len - rows..len + 1)
698        .fold(
699            || (Update::new(), Ok(())),
700            |(mut pending, res), row| {
701                let res = merge_err(remove_subtree(data, &mut pending, row), res);
702                (pending, res)
703            },
704        )
705        .reduce(
706            || (Update::new(), Ok(())),
707            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
708        );
709    pending.merge_from(up);
710    res
711}
712
713fn create_table(
714    data: &sled::Tree,
715    locked: &sled::Tree,
716    pending: &mut Update,
717    base: Path,
718    rows: Vec<ArcStr>,
719    cols: Vec<ArcStr>,
720    lock: bool,
721) -> Result<()> {
722    use rayon::prelude::*;
723    let rows: Vec<String> =
724        rows.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
725    let cols: Vec<String> =
726        cols.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
727    let (up, res) = rows
728        .par_iter()
729        .map(|row| cols.par_iter().map(move |col| (row, col)))
730        .flatten()
731        .fold(
732            || (Update::new(), String::new(), Ok(())),
733            |(mut pending, mut buf, res), (row, col)| {
734                buf.clear();
735                write!(buf, "{}/{}", row, col).unwrap();
736                let path = base.append(buf.as_str());
737                let res = match data.contains_key(path.as_bytes()) {
738                    Ok(false) => {
739                        let r = set_data(data, &mut pending, true, path, Value::Null);
740                        merge_err(r, res)
741                    }
742                    Ok(true) => res,
743                    Err(e) => Err(e.into()),
744                };
745                (pending, buf, res)
746            },
747        )
748        .map(|(u, _, r)| (u, r))
749        .reduce(
750            || (Update::new(), Ok(())),
751            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
752        );
753    pending.merge_from(up);
754    if lock {
755        set_locked(locked, pending, base)?
756    }
757    res
758}
759
760fn table_rows(data: &sled::Tree, base: &Path) -> Result<GPooled<Vec<Path>>> {
761    let base_levels = Path::levels(&base);
762    let mut paths = PATHS.take();
763    for r in data.scan_prefix(base.as_bytes()).keys() {
764        let k = r?;
765        if let Ok(path) = str::from_utf8(&*k) {
766            if Path::is_parent(base, path) {
767                let mut row = path;
768                let mut level = Path::levels(row);
769                while level > base_levels + 1 {
770                    row = Path::dirname(row).unwrap_or("/");
771                    level -= 1;
772                }
773                match paths.last() {
774                    None => paths.push(Path::from(ArcStr::from(row))),
775                    Some(last) => {
776                        if last.as_ref() != row {
777                            paths.push(Path::from(ArcStr::from(row)));
778                        }
779                    }
780                }
781            }
782        }
783    }
784    Ok(paths)
785}
786
787fn add_table_columns(
788    data: &sled::Tree,
789    pending: &mut Update,
790    base: Path,
791    cols: Vec<ArcStr>,
792) -> Result<()> {
793    use rayon::prelude::*;
794    let cols: Vec<String> =
795        cols.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
796    let (up, res) = table_rows(data, &base)?
797        .par_drain(..)
798        .fold(
799            || (Update::new(), Ok(())),
800            |(mut pending, mut res), row| {
801                for col in &cols {
802                    let path = row.append(col);
803                    res = match data.contains_key(path.as_bytes()) {
804                        Ok(false) => {
805                            let r = set_data(data, &mut pending, true, path, Value::Null);
806                            merge_err(r, res)
807                        }
808                        Ok(true) => res,
809                        Err(e) => Err(e.into()),
810                    }
811                }
812                (pending, res)
813            },
814        )
815        .reduce(
816            || (Update::new(), Ok(())),
817            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
818        );
819    pending.merge_from(up);
820    res
821}
822
823fn del_table_columns(
824    data: &sled::Tree,
825    pending: &mut Update,
826    base: Path,
827    cols: Vec<ArcStr>,
828) -> Result<()> {
829    use rayon::prelude::*;
830    let cols: Vec<String> =
831        cols.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
832    let (up, res) = table_rows(data, &base)?
833        .par_drain(..)
834        .fold(
835            || (Update::new(), Ok(())),
836            |(mut pending, mut res), row| {
837                for col in &cols {
838                    let path = row.append(col);
839                    res = merge_err(remove(data, &mut pending, path), res);
840                }
841                (pending, res)
842            },
843        )
844        .reduce(
845            || (Update::new(), Ok(())),
846            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
847        );
848    pending.merge_from(up);
849    res
850}
851
852fn add_table_rows(
853    data: &sled::Tree,
854    pending: &mut Update,
855    base: Path,
856    rows: Vec<ArcStr>,
857) -> Result<()> {
858    use rayon::prelude::*;
859    let rows: Vec<String> =
860        rows.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
861    let base_levels = Path::levels(&base);
862    let mut cols = HashSet::new();
863    for r in data.scan_prefix(base.as_bytes()).keys() {
864        let k = r?;
865        let path = str::from_utf8(&k)?;
866        if Path::is_parent(&base, path) {
867            if Path::levels(path) == base_levels + 2 {
868                let col = Path::basename(path).unwrap_or("");
869                cols.insert(k.subslice(k.len() - col.len(), col.len()));
870            }
871        }
872    }
873    let cols: HashSet<String> = cols
874        .into_iter()
875        .filter_map(|k| str::from_utf8(&k).ok().map(String::from))
876        .collect();
877    let (up, res) = rows
878        .into_par_iter()
879        .fold(
880            || (Update::new(), String::new(), Ok(())),
881            |(mut pending, mut buf, mut res), row| {
882                for col in &cols {
883                    buf.clear();
884                    write!(buf, "{}/{}", row, col).unwrap();
885                    let path = base.append(&buf);
886                    res = match data.contains_key(path.as_bytes()) {
887                        Ok(false) => {
888                            let r = set_data(data, &mut pending, true, path, Value::Null);
889                            merge_err(r, res)
890                        }
891                        Ok(true) => res,
892                        Err(e) => Err(e.into()),
893                    }
894                }
895                (pending, buf, res)
896            },
897        )
898        .map(|(u, _, r)| (u, r))
899        .reduce(
900            || (Update::new(), Ok(())),
901            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
902        );
903    pending.merge_from(up);
904    res
905}
906
907fn del_table_rows(
908    data: &sled::Tree,
909    pending: &mut Update,
910    base: Path,
911    rows: Vec<ArcStr>,
912) -> Result<()> {
913    use rayon::prelude::*;
914    let rows: Vec<String> =
915        rows.into_iter().map(|c| Path::escape(&c).into_owned()).collect();
916    let (up, res) = rows
917        .into_par_iter()
918        .fold(
919            || (Update::new(), Ok(())),
920            |(mut pending, res), row| {
921                let path = base.append(&row);
922                let res = merge_err(remove_subtree(data, &mut pending, path), res);
923                (pending, res)
924            },
925        )
926        .reduce(
927            || (Update::new(), Ok(())),
928            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
929        );
930    pending.merge_from(up);
931    res
932}
933
934fn is_locked(locked: &sled::Tree, path: &Path, parent_only: bool) -> Result<bool> {
935    let mut iter = if parent_only {
936        locked.range(..path.as_bytes())
937    } else {
938        locked.range(..=path.as_bytes())
939    };
940    loop {
941        match iter.next_back() {
942            None => break Ok(false),
943            Some(r) => {
944                let (k, v) = r?;
945                let k = str::from_utf8(&k)?;
946                if Path::is_parent(k, &path) {
947                    break Ok(&*v == &[1u8]);
948                }
949            }
950        }
951    }
952}
953
954fn set_locked(locked: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
955    if is_locked(locked, &path, true)? {
956        locked.remove(path.as_bytes())?;
957    } else {
958        locked.insert(path.as_bytes(), &[1u8])?;
959    }
960    pending.locked.push(path);
961    Ok(())
962}
963
964fn set_unlocked(locked: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
965    if !is_locked(locked, &path, true)? {
966        locked.remove(path.as_bytes())?;
967    } else {
968        locked.insert(path.as_bytes(), &[0u8])?;
969    }
970    pending.unlocked.push(path);
971    Ok(())
972}
973
974fn remove_subtree(data: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
975    use rayon::prelude::*;
976    let mut paths = PATHS.take();
977    for res in data.scan_prefix(path.as_ref()).keys() {
978        let key = res?;
979        let key = str::from_utf8(&key)?;
980        if Path::is_parent(&path, &key) {
981            paths.push(Path::from(ArcStr::from(key)));
982        }
983    }
984    let (up, res) = paths
985        .par_drain(..)
986        .fold(
987            || (Update::new(), Ok(())),
988            |(mut pending, res), path| {
989                let res = merge_err(remove(data, &mut pending, path), res);
990                (pending, res)
991            },
992        )
993        .reduce(
994            || (Update::new(), Ok(())),
995            |(u0, r0), (u1, r1)| (u0.merge(u1), merge_err(r0, r1)),
996        );
997    pending.merge_from(up);
998    res
999}
1000
1001fn add_root(roots: &sled::Tree, pending: &mut Update, path: Path) -> Result<()> {
1002    let key = path.as_bytes();
1003    if let Some(r) = roots.range(..key).next_back() {
1004        let (prev, _) = r?;
1005        let prev = str::from_utf8(&prev)?;
1006        if Path::is_parent(prev, &path) {
1007            bail!("a parent path is already a root")
1008        }
1009    }
1010    if !roots.contains_key(key)? {
1011        roots.insert(key, &[])?;
1012        pending.added_roots.push(path);
1013    }
1014    Ok(())
1015}
1016
1017fn del_root(
1018    data: &sled::Tree,
1019    roots: &sled::Tree,
1020    locked: &sled::Tree,
1021    pending: &mut Update,
1022    path: Path,
1023) -> Result<()> {
1024    let key = path.as_bytes();
1025    if roots.contains_key(key)? {
1026        let mut iter = roots.range(..key).keys();
1027        let remove = loop {
1028            match iter.next_back() {
1029                None => break false,
1030                Some(r) => {
1031                    let k = r?;
1032                    let k = str::from_utf8(&k)?;
1033                    if Path::is_parent(k, &path) {
1034                        break true;
1035                    }
1036                }
1037            }
1038        };
1039        if remove {
1040            remove_subtree(data, pending, path.clone())?
1041        }
1042        for r in roots.scan_prefix(key).keys() {
1043            let k = r?;
1044            let k = str::from_utf8(&k)?;
1045            if Path::is_parent(&path, k) {
1046                roots.remove(&k)?;
1047                pending.removed_roots.push(Path::from(ArcStr::from(k)));
1048            }
1049        }
1050        for r in locked.scan_prefix(key).keys() {
1051            let k = r?;
1052            let k = str::from_utf8(&k)?;
1053            if Path::is_parent(&path, k) {
1054                locked.remove(&k)?;
1055                pending.unlocked.push(Path::from(ArcStr::from(k)));
1056            }
1057        }
1058    }
1059    Ok(())
1060}
1061
1062fn send_reply(reply: Reply, r: Result<()>) {
1063    match (r, reply) {
1064        (Ok(()), Some(reply)) => {
1065            reply.send(Value::Null);
1066        }
1067        (Err(e), Some(reply)) => {
1068            let e = Value::error(format!("{}", e));
1069            reply.send(e);
1070        }
1071        (_, None) => (),
1072    }
1073}
1074
1075fn commit_complex(
1076    data: &sled::Tree,
1077    locked: &sled::Tree,
1078    roots: &sled::Tree,
1079    mut txn: Txn,
1080) -> Update {
1081    let mut pending = Update::new();
1082    for (op, reply) in txn.0.drain(..) {
1083        let r = match op {
1084            TxnOp::CreateSheet { base, rows, cols, max_rows, max_columns, lock } => {
1085                create_sheet(
1086                    &data,
1087                    &locked,
1088                    &mut pending,
1089                    base,
1090                    rows,
1091                    cols,
1092                    max_rows,
1093                    max_columns,
1094                    lock,
1095                )
1096            }
1097            TxnOp::AddSheetColumns { base, cols } => {
1098                add_sheet_columns(&data, &mut pending, base, cols)
1099            }
1100            TxnOp::AddSheetRows { base, rows } => {
1101                add_sheet_rows(&data, &mut pending, base, rows)
1102            }
1103            TxnOp::DelSheetColumns { base, cols } => {
1104                del_sheet_columns(&data, &mut pending, base, cols)
1105            }
1106            TxnOp::DelSheetRows { base, rows } => {
1107                del_sheet_rows(&data, &mut pending, base, rows)
1108            }
1109            TxnOp::CreateTable { base, rows, cols, lock } => {
1110                create_table(&data, &locked, &mut pending, base, rows, cols, lock)
1111            }
1112            TxnOp::AddTableColumns { base, cols } => {
1113                add_table_columns(&data, &mut pending, base, cols)
1114            }
1115            TxnOp::AddTableRows { base, rows } => {
1116                add_table_rows(&data, &mut pending, base, rows)
1117            }
1118            TxnOp::DelTableColumns { base, cols } => {
1119                del_table_columns(&data, &mut pending, base, cols)
1120            }
1121            TxnOp::DelTableRows { base, rows } => {
1122                del_table_rows(&data, &mut pending, base, rows)
1123            }
1124            TxnOp::Remove(path) => remove(&data, &mut pending, path),
1125            TxnOp::RemoveSubtree(path) => remove_subtree(&data, &mut pending, path),
1126            TxnOp::SetData(update, path, value) => {
1127                set_data(&data, &mut pending, update, path, value)
1128            }
1129            TxnOp::SetLocked(path) => set_locked(&locked, &mut pending, path),
1130            TxnOp::SetUnlocked(path) => set_unlocked(&locked, &mut pending, path),
1131            TxnOp::AddRoot(path) => add_root(&roots, &mut pending, path),
1132            TxnOp::DelRoot(path) => del_root(&data, &roots, &locked, &mut pending, path),
1133            TxnOp::Flush(finished) => {
1134                let _: Result<_, _> = data.flush();
1135                let _: Result<_, _> = locked.flush();
1136                let _: Result<_, _> = finished.send(());
1137                Ok(())
1138            }
1139        };
1140        send_reply(reply, r);
1141    }
1142    pending
1143}
1144
1145fn commit_simple(data: &sled::Tree, locked: &sled::Tree, mut txn: Txn) -> Update {
1146    use rayon::prelude::*;
1147    let mut by_path = BYPATH.take();
1148    for (op, reply) in txn.0.drain(..) {
1149        by_path.entry(op.path()).or_insert_with(|| STXNS.take()).push((op, reply));
1150    }
1151    by_path
1152        .par_drain()
1153        .fold(
1154            || Update::new(),
1155            |mut pending, (_, mut ops)| {
1156                for (op, reply) in ops.drain(..) {
1157                    let r = match op {
1158                        TxnOp::SetData(update, path, value) => {
1159                            set_data(data, &mut pending, update, path, value)
1160                        }
1161                        TxnOp::Remove(path) => remove(data, &mut pending, path),
1162                        TxnOp::SetLocked(path) => set_locked(locked, &mut pending, path),
1163                        TxnOp::SetUnlocked(path) => {
1164                            set_unlocked(locked, &mut pending, path)
1165                        }
1166                        TxnOp::CreateSheet { .. }
1167                        | TxnOp::AddSheetColumns { .. }
1168                        | TxnOp::AddSheetRows { .. }
1169                        | TxnOp::DelSheetColumns { .. }
1170                        | TxnOp::DelSheetRows { .. }
1171                        | TxnOp::CreateTable { .. }
1172                        | TxnOp::AddTableColumns { .. }
1173                        | TxnOp::AddTableRows { .. }
1174                        | TxnOp::DelTableColumns { .. }
1175                        | TxnOp::DelTableRows { .. }
1176                        | TxnOp::RemoveSubtree { .. }
1177                        | TxnOp::AddRoot(_)
1178                        | TxnOp::DelRoot(_)
1179                        | TxnOp::Flush(_) => unreachable!(),
1180                    };
1181                    send_reply(reply, r)
1182                }
1183                pending
1184            },
1185        )
1186        .reduce(|| Update::new(), |u0, u1| u0.merge(u1))
1187}
1188
1189struct Stats {
1190    publisher: Publisher,
1191    busy: Val,
1192    queued: Val,
1193    queued_cnt: AtomicUsize,
1194    deleting: Val,
1195    to_commit: UnboundedSender<UpdateBatch>,
1196}
1197
1198async fn stats_commit_task(rx: UnboundedReceiver<UpdateBatch>) {
1199    let mut rx = Batched::new(rx, 1_000_000);
1200    let mut pending: Option<UpdateBatch> = None;
1201    while let Some(batch) = rx.next().await {
1202        match batch {
1203            BatchItem::InBatch(mut b) => match &mut pending {
1204                Some(pending) => {
1205                    let _: Result<_> = pending.merge_from(&mut b);
1206                }
1207                None => {
1208                    pending = Some(b);
1209                }
1210            },
1211            BatchItem::EndBatch => {
1212                if let Some(pending) = pending.take() {
1213                    pending.commit(Some(Duration::from_secs(10))).await
1214                }
1215            }
1216        }
1217    }
1218}
1219
1220impl Stats {
1221    fn new(publisher: Publisher, base_path: Path) -> Result<Arc<Self>> {
1222        let busy = publisher.publish(base_path.append("busy"), false)?;
1223        let queued = publisher.publish(base_path.append("queued"), 0)?;
1224        let deleting = publisher.publish(base_path.append("background-delete"), false)?;
1225        let (tx, rx) = unbounded();
1226        task::spawn(stats_commit_task(rx));
1227        Ok(Arc::new(Stats {
1228            publisher,
1229            busy,
1230            queued,
1231            queued_cnt: AtomicUsize::new(0),
1232            deleting,
1233            to_commit: tx,
1234        }))
1235    }
1236
1237    fn inc_queued(&self) {
1238        let n = self.queued_cnt.fetch_add(1, Ordering::Relaxed) + 1;
1239        let mut batch = self.publisher.start_batch();
1240        self.queued.update(&mut batch, n);
1241        let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1242    }
1243
1244    fn dec_queued(&self) {
1245        let n = self.queued_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
1246        let mut batch = self.publisher.start_batch();
1247        self.queued.update(&mut batch, n);
1248        self.busy.update(&mut batch, true);
1249        let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1250    }
1251
1252    fn set_busy(&self, busy: bool) {
1253        let mut batch = self.publisher.start_batch();
1254        self.busy.update(&mut batch, busy);
1255        let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1256    }
1257
1258    fn set_deleting(&self, deleting: bool) {
1259        let mut batch = self.publisher.start_batch();
1260        self.deleting.update(&mut batch, deleting);
1261        let _: Result<_, _> = self.to_commit.unbounded_send(batch);
1262    }
1263}
1264
1265async fn background_delete_task(data: sled::Tree, stats: Option<Arc<Stats>>) {
1266    if let Some(stats) = &stats {
1267        stats.set_deleting(true);
1268    }
1269    task::block_in_place(|| {
1270        for r in data.iter() {
1271            if let Ok((k, v)) = r {
1272                if let DatumKind::Deleted = DatumKind::decode(&mut &*v) {
1273                    // CR estokes: log these errors
1274                    let _: Result<_, _> =
1275                        data.compare_and_swap(k, Some(v), None::<sled::IVec>);
1276                }
1277            }
1278        }
1279    });
1280    if let Some(stats) = &stats {
1281        stats.set_deleting(false);
1282    }
1283}
1284
1285async fn commit_txns_task(
1286    stats: Option<Arc<Stats>>,
1287    data: sled::Tree,
1288    locked: sled::Tree,
1289    roots: sled::Tree,
1290    incoming: UnboundedReceiver<Txn>,
1291    outgoing: UnboundedSender<Update>,
1292) {
1293    let mut incoming = incoming.fuse();
1294    async fn wait_delete_task(jh: &mut Option<task::JoinHandle<()>>) {
1295        match jh {
1296            Some(jh) => {
1297                // CR estokes: log this
1298                let _: Result<_, _> = jh.await;
1299            }
1300            None => future::pending().await,
1301        }
1302    }
1303    let mut delete_task: Option<task::JoinHandle<()>> = None;
1304    let mut delete_required = false;
1305    loop {
1306        select_biased! {
1307            () = wait_delete_task(&mut delete_task).fuse() => {
1308                delete_task = None;
1309            }
1310            txn = incoming.select_next_some() => {
1311                if let Some(stats) = &stats {
1312                    stats.dec_queued();
1313                }
1314                let (simple, delete) =
1315                    txn.0.iter().fold((true, false), |(simple, delete), op| match &op.0 {
1316                        TxnOp::CreateSheet { .. }
1317                        | TxnOp::AddSheetColumns { .. }
1318                        | TxnOp::AddSheetRows { .. }
1319                        | TxnOp::CreateTable { .. }
1320                        | TxnOp::AddTableColumns { .. }
1321                        | TxnOp::AddTableRows { .. }
1322                        | TxnOp::AddRoot(_)
1323                        | TxnOp::Flush(_) => (false, delete),
1324                        TxnOp::RemoveSubtree { .. }
1325                        | TxnOp::DelTableColumns { .. }
1326                        | TxnOp::DelTableRows { .. }
1327                        | TxnOp::DelSheetColumns { .. }
1328                        | TxnOp::DelSheetRows { .. }
1329                        | TxnOp::DelRoot(_) => (false, true),
1330                        TxnOp::Remove(_) => (simple, true),
1331                        TxnOp::SetData(_, _, _)
1332                        | TxnOp::SetLocked(_)
1333                        | TxnOp::SetUnlocked(_) => (simple, delete),
1334                    });
1335                delete_required |= delete;
1336                let pending = if simple {
1337                    task::block_in_place(|| commit_simple(&data, &locked, txn))
1338                } else {
1339                    task::block_in_place(|| commit_complex(&data, &locked, &roots, txn))
1340                };
1341                if let Some(stats) = &stats {
1342                    stats.set_busy(false);
1343                }
1344                match outgoing.unbounded_send(pending) {
1345                    Ok(()) => (),
1346                    Err(_) => break,
1347                }
1348                if delete_required && delete_task.is_none() {
1349                    let job = background_delete_task(data.clone(), stats.clone());
1350                    delete_required = false;
1351                    delete_task = Some(task::spawn(job));
1352                }
1353            }
1354            complete => break,
1355        }
1356    }
1357}
1358
1359#[derive(Clone)]
1360pub struct Db {
1361    db: sled::Db,
1362    data: sled::Tree,
1363    locked: sled::Tree,
1364    roots: sled::Tree,
1365    submit_txn: UnboundedSender<Txn>,
1366    stats: Option<Arc<Stats>>,
1367}
1368
1369impl Db {
1370    pub(super) fn new(
1371        cfg: &Params,
1372        publisher: Publisher,
1373        base_path: Option<Path>,
1374    ) -> Result<(Self, UnboundedReceiver<Update>)> {
1375        let stats = match base_path {
1376            None => None,
1377            Some(p) => Some(Stats::new(publisher, p)?),
1378        };
1379        let path = cfg
1380            .db
1381            .as_ref()
1382            .map(PathBuf::from)
1383            .or_else(Params::default_db_path)
1384            .ok_or_else(|| {
1385                anyhow!("db dir not specified and no default could be determined")
1386            })?;
1387        let db = sled::Config::default()
1388            .cache_capacity(cfg.cache_size.unwrap_or(16 * 1024 * 1024))
1389            .path(&path)
1390            .open()?;
1391        let data = db.open_tree("data")?;
1392        let locked = db.open_tree("locked")?;
1393        let roots = db.open_tree("roots")?;
1394        let (tx_incoming, rx_incoming) = unbounded();
1395        let (tx_outgoing, rx_outgoing) = unbounded();
1396        task::spawn(commit_txns_task(
1397            stats.clone(),
1398            data.clone(),
1399            locked.clone(),
1400            roots.clone(),
1401            rx_incoming,
1402            tx_outgoing,
1403        ));
1404        Ok((Db { db, data, locked, roots, submit_txn: tx_incoming, stats }, rx_outgoing))
1405    }
1406
1407    pub fn open_tree(&self, name: &str) -> Result<sled::Tree> {
1408        if name == "data" || name == "locked" || name == "roots" {
1409            bail!("tree name reserved")
1410        }
1411        Ok(self.db.open_tree(name)?)
1412    }
1413
1414    pub(super) fn commit(&self, txn: Txn) {
1415        if let Some(stats) = &self.stats {
1416            stats.inc_queued();
1417        }
1418        let _: Result<_, _> = self.submit_txn.unbounded_send(txn);
1419    }
1420
1421    pub(super) async fn flush_async(&self) -> Result<()> {
1422        let (tx, rx) = oneshot::channel();
1423        let mut txn = Txn::new();
1424        txn.0.push((TxnOp::Flush(tx), None));
1425        self.commit(txn);
1426        let _: Result<_, _> = rx.await;
1427        Ok(())
1428    }
1429
1430    pub fn relative_column(&self, base: &Path, offset: i32) -> Result<Option<Path>> {
1431        use std::ops::Bound::{self, *};
1432        let rowbase = Path::dirname(base).ok_or_else(|| anyhow!("no row"))?;
1433        let mut i = 0;
1434        if offset == 0 {
1435            Ok(Some(base.clone()))
1436        } else if offset < 0 {
1437            let mut iter = self
1438                .data
1439                .range::<&str, (Bound<&str>, Bound<&str>)>((Unbounded, Excluded(&**base)))
1440                .keys();
1441            while let Some(r) = iter.next_back() {
1442                let r = r?;
1443                let path = str::from_utf8(&r)?;
1444                if !Path::is_parent(&base, path) {
1445                    return Ok(None);
1446                } else if Path::dirname(path) == Some(rowbase) {
1447                    i -= 1;
1448                    if i == offset {
1449                        return Ok(Some(Path::from(ArcStr::from(path))));
1450                    }
1451                } else {
1452                    return Ok(None);
1453                }
1454            }
1455            Ok(None)
1456        } else {
1457            let iter = self
1458                .data
1459                .range::<&str, (Bound<&str>, Bound<&str>)>((Excluded(&**base), Unbounded))
1460                .keys();
1461            for r in iter {
1462                let r = r?;
1463                let path = str::from_utf8(&r)?;
1464                if !Path::is_parent(&base, path) {
1465                    return Ok(None);
1466                } else if Path::dirname(path) == Some(rowbase) {
1467                    i += 1;
1468                    if i == offset {
1469                        return Ok(Some(Path::from(ArcStr::from(path))));
1470                    }
1471                } else {
1472                    return Ok(None);
1473                }
1474            }
1475            Ok(None)
1476        }
1477    }
1478
1479    pub fn relative_row(&self, base: &Path, offset: i32) -> Result<Option<Path>> {
1480        use std::ops::Bound::{self, *};
1481        macro_rules! or_none {
1482            ($e:expr) => {
1483                match $e {
1484                    Some(p) => p,
1485                    None => return Ok(None),
1486                }
1487            };
1488        }
1489        let column = or_none!(Path::basename(base));
1490        let mut rowbase = ArcStr::from(or_none!(Path::dirname(base)));
1491        let tablebase = ArcStr::from(or_none!(Path::dirname(rowbase.as_str())));
1492        let mut i = 0;
1493        if offset == 0 {
1494            Ok(Some(base.clone()))
1495        } else if offset < 0 {
1496            let mut iter = self
1497                .data
1498                .range::<&str, (Bound<&str>, Bound<&str>)>((Unbounded, Excluded(&**base)))
1499                .keys();
1500            while let Some(r) = iter.next_back() {
1501                let r = r?;
1502                let path = str::from_utf8(&r)?;
1503                let path_column = or_none!(Path::basename(path));
1504                let path_row = or_none!(Path::dirname(path));
1505                let path_table = or_none!(Path::dirname(path_row));
1506                if !Path::is_parent(&base, path) {
1507                    return Ok(None);
1508                } else if path_table == tablebase {
1509                    if path_row != rowbase.as_str() {
1510                        i -= 1;
1511                        rowbase = ArcStr::from(path_row);
1512                    }
1513                    if i == offset && path_column == column {
1514                        return Ok(Some(Path::from(ArcStr::from(path))));
1515                    }
1516                    if i < offset {
1517                        return Ok(None);
1518                    }
1519                } else {
1520                    return Ok(None);
1521                }
1522            }
1523            Ok(None)
1524        } else {
1525            let iter = self
1526                .data
1527                .range::<&str, (Bound<&str>, Bound<&str>)>((Excluded(&**base), Unbounded))
1528                .keys();
1529            for r in iter {
1530                let r = r?;
1531                let path = str::from_utf8(&r)?;
1532                let path_column = or_none!(Path::basename(path));
1533                let path_row = or_none!(Path::dirname(path));
1534                let path_table = or_none!(Path::dirname(path_row));
1535                if !Path::is_parent(&base, path) {
1536                    return Ok(None);
1537                } else if path_table == tablebase {
1538                    if path_row != rowbase.as_str() {
1539                        i += 1;
1540                        rowbase = ArcStr::from(path_row);
1541                    }
1542                    if i == offset && path_column == column {
1543                        return Ok(Some(Path::from(ArcStr::from(path))));
1544                    }
1545                    if i > offset {
1546                        return Ok(None);
1547                    }
1548                } else {
1549                    return Ok(None);
1550                }
1551            }
1552            Ok(None)
1553        }
1554    }
1555
1556    pub fn lookup<P: AsRef<[u8]>>(&self, path: P) -> Result<Option<Datum>> {
1557        lookup_value(&self.data, path)
1558    }
1559
1560    pub fn lookup_value<P: AsRef<[u8]>>(&self, path: P) -> Option<Value> {
1561        self.lookup(path).ok().flatten().and_then(|d| match d {
1562            Datum::Deleted | Datum::Formula(_, _) => None,
1563            Datum::Data(v) => Some(v),
1564        })
1565    }
1566
1567    fn decode_iter(
1568        iter: sled::Iter,
1569    ) -> impl Iterator<Item = Result<(Path, DatumKind, sled::IVec)>> + 'static {
1570        iter.map(|res| {
1571            let (key, val) = res?;
1572            let path = Path::from(ArcStr::from(str::from_utf8(&key)?));
1573            let value = DatumKind::decode(&mut &*val);
1574            Ok((path, value, val))
1575        })
1576    }
1577
1578    pub fn iter(
1579        &self,
1580    ) -> impl Iterator<Item = Result<(Path, DatumKind, sled::IVec)>> + 'static {
1581        Self::decode_iter(self.data.iter())
1582    }
1583
1584    pub fn iter_prefix(
1585        &self,
1586        prefix: Path,
1587    ) -> impl Iterator<Item = Result<(Path, DatumKind, sled::IVec)>> + 'static {
1588        Self::decode_iter(self.data.scan_prefix(&*prefix))
1589    }
1590
1591    /// This is O(N)
1592    pub fn prefix_len(&self, prefix: &Path) -> usize {
1593        self.data.scan_prefix(&**prefix).count()
1594    }
1595
1596    pub fn locked(&self) -> impl Iterator<Item = Result<(Path, bool)>> + 'static {
1597        self.locked.iter().map(|r| {
1598            let (k, v) = r?;
1599            let path = Path::from(ArcStr::from(str::from_utf8(&k)?));
1600            let locked = &*v == &[1u8];
1601            Ok((path, locked))
1602        })
1603    }
1604
1605    pub fn roots(&self) -> impl Iterator<Item = Result<Path>> + 'static {
1606        iter_paths(&self.roots)
1607    }
1608
1609    pub fn clear(&self) -> Result<()> {
1610        self.db.clear()?;
1611        self.data.clear()?;
1612        self.locked.clear()?;
1613        Ok(self.roots.clear()?)
1614    }
1615}