use crate::acl::{Actor, Can, Permission, Policy};
use crate::crdt::{Causal, Crdt, DotStore};
use crate::crypto::Keypair;
use crate::cursor::array_util::ArrayMetaEntry;
use crate::dotset::Dot;
use crate::fraction::Fraction;
use crate::id::{DocId, PeerId};
use crate::path::{Path, PathBuf};
use crate::schema::{ArchivedSchema, PrimitiveKind, Schema};
use crate::subscriber::Subscriber;
use anyhow::{anyhow, Context, Result};
use rkyv::Archived;
use smallvec::SmallVec;
#[derive(Clone, Debug)]
pub struct Cursor<'a> {
key: Keypair,
peer_id: PeerId,
schema: &'a Archived<Schema>,
crdt: &'a Crdt,
path: PathBuf,
array: SmallVec<[ArrayWrapper; 1]>,
}
impl<'a> Cursor<'a> {
pub fn new(key: Keypair, id: DocId, schema: &'a Archived<Schema>, crdt: &'a Crdt) -> Self {
let mut path = PathBuf::new();
path.doc(&id);
Self {
key,
peer_id: key.peer_id(),
schema,
path,
crdt,
array: Default::default(),
}
}
pub fn subscribe(&self) -> Subscriber {
self.crdt.watch_path(self.path.as_path())
}
pub fn can(&self, peer: &PeerId, perm: Permission) -> Result<bool> {
self.crdt.can(peer, perm, self.path.as_path())
}
pub fn enabled(&self) -> Result<bool> {
if let ArchivedSchema::Flag = &self.schema {
Ok(self
.crdt
.scan_path(self.path.as_path())
.find_map(|k| Path::new(&k).parent()?.parent()?.last()?.nonce())
.is_some())
} else {
Err(anyhow!("not a flag"))
}
}
pub fn bools(&self) -> Result<impl Iterator<Item = Result<bool>>> {
if let ArchivedSchema::Reg(PrimitiveKind::Bool) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|path| {
Some(Ok(Path::new(&path)
.parent()?
.parent()?
.last()?
.prim_bool()?))
}))
} else {
Err(anyhow!("not a Reg<bool>"))
}
}
pub fn u64s(&self) -> Result<impl Iterator<Item = Result<u64>>> {
if let ArchivedSchema::Reg(PrimitiveKind::U64) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|path| {
Some(Ok(Path::new(&path)
.parent()?
.parent()?
.last()?
.prim_u64()?))
}))
} else {
Err(anyhow!("not a Reg<u64>"))
}
}
pub fn i64s(&self) -> Result<impl Iterator<Item = Result<i64>>> {
if let ArchivedSchema::Reg(PrimitiveKind::I64) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|path| {
Some(Ok(Path::new(&path)
.parent()?
.parent()?
.last()?
.prim_i64()?))
}))
} else {
Err(anyhow!("not a Reg<i64>"))
}
}
pub fn strs(&self) -> Result<impl Iterator<Item = Result<String>>> {
if let ArchivedSchema::Reg(PrimitiveKind::Str) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|path| {
Some(Ok(Path::new(&path)
.parent()?
.parent()?
.last()?
.prim_str()?
.to_owned()))
}))
} else {
Err(anyhow!("not a Reg<String>"))
}
}
pub fn key_bool(&mut self, key: bool) -> Result<&mut Self> {
if let ArchivedSchema::Table(PrimitiveKind::Bool, schema) = &self.schema {
self.path.prim_bool(key);
self.schema = schema;
Ok(self)
} else {
Err(anyhow!("not a Table<bool, _>"))
}
}
pub fn key_u64(&mut self, key: u64) -> Result<&mut Self> {
if let ArchivedSchema::Table(PrimitiveKind::U64, schema) = &self.schema {
self.path.prim_u64(key);
self.schema = schema;
Ok(self)
} else {
Err(anyhow!("not a Table<u64, _>"))
}
}
pub fn key_i64(&mut self, key: i64) -> Result<&mut Self> {
if let ArchivedSchema::Table(PrimitiveKind::I64, schema) = &self.schema {
self.path.prim_i64(key);
self.schema = schema;
Ok(self)
} else {
Err(anyhow!("not a Table<i64, _>"))
}
}
pub fn key_str(&mut self, key: &str) -> Result<&mut Self> {
if let ArchivedSchema::Table(PrimitiveKind::Str, schema) = &self.schema {
self.path.prim_str(key);
self.schema = schema;
Ok(self)
} else {
Err(anyhow!("not a Table<String, _>"))
}
}
pub fn keys_bool(&self) -> Result<impl Iterator<Item = bool> + '_> {
if let ArchivedSchema::Table(PrimitiveKind::Bool, _) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|key| {
Path::new(&key)
.strip_prefix(self.path.as_path())
.ok()?
.first()?
.prim_bool()
}))
} else {
Err(anyhow!("not a Table<bool, _>"))
}
}
pub fn keys_u64(&self) -> Result<impl Iterator<Item = u64> + '_> {
if let ArchivedSchema::Table(PrimitiveKind::U64, _) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|key| {
Path::new(&key)
.strip_prefix(self.path.as_path())
.ok()?
.first()?
.prim_u64()
}))
} else {
Err(anyhow!("not a Table<u64, _>"))
}
}
pub fn keys_i64(&self) -> Result<impl Iterator<Item = i64> + '_> {
if let ArchivedSchema::Table(PrimitiveKind::I64, _) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|key| {
Path::new(&key)
.strip_prefix(self.path.as_path())
.ok()?
.first()?
.prim_i64()
}))
} else {
Err(anyhow!("not a Table<i64, _>"))
}
}
pub fn keys_str(&self) -> Result<impl Iterator<Item = String> + '_> {
if let ArchivedSchema::Table(PrimitiveKind::Str, _) = &self.schema {
Ok(self.crdt.scan_path(self.path.as_path()).filter_map(|key| {
Path::new(&key)
.strip_prefix(self.path.as_path())
.ok()?
.first()?
.prim_string()
}))
} else {
Err(anyhow!("not a Table<String, _>"))
}
}
pub fn index(&mut self, ix: usize) -> Result<&mut Self> {
if let ArchivedSchema::Array(schema) = &self.schema {
self.schema = schema;
let (array, path) = ArrayWrapper::new(self, ix)?;
self.array.push(array);
self.path = path;
Ok(self)
} else {
anyhow::bail!("not an Array<_>");
}
}
pub fn len(&mut self) -> Result<u32> {
if let ArchivedSchema::Array(_) = &self.schema {
self.path.prim_str(array_util::ARRAY_VALUES);
let res = self.count_path(self.path.as_path());
self.path.pop();
res
} else {
anyhow::bail!("not an Array<_>");
}
}
pub fn is_empty(&mut self) -> Result<bool> {
Ok(self.len()? == 0)
}
pub fn field(&mut self, key: &str) -> Result<&mut Self> {
if let ArchivedSchema::Struct(fields) = &self.schema {
if let Some(schema) = fields.get(key) {
self.path.prim_str(key);
self.schema = schema;
Ok(self)
} else {
Err(anyhow!("field doesn't exist"))
}
} else {
Err(anyhow!("not a struct"))
}
}
fn count_path(&self, path: Path) -> Result<u32> {
let mut i = 0;
for _ in self.crdt.scan_path(path) {
i += 1;
}
Ok(i)
}
fn nonce(&self, path: &mut PathBuf) {
path.nonce(nonce());
}
fn sign(&self, path: &mut PathBuf) {
tracing::debug!("signing {} as {:?}", path.as_path(), self.peer_id);
let sig = self.key.sign(path.as_ref());
path.peer(&self.peer_id);
path.sig(sig);
}
fn tombstone(&self) -> Result<DotStore> {
let mut expired = DotStore::new();
for k in self.crdt.scan_path(self.path.as_path()) {
let path = Path::new(&k);
if path
.parent()
.unwrap()
.parent()
.unwrap()
.last()
.unwrap()
.policy()
.is_none()
{
let mut path = path.to_owned();
self.sign(&mut path);
expired.insert(path);
}
}
Ok(expired)
}
pub fn enable(&self) -> Result<Causal> {
if *self.schema != ArchivedSchema::Flag {
return Err(anyhow!("not a flag"));
}
if !self.can(&self.peer_id, Permission::Write)? {
return Err(anyhow!("unauthorized"));
}
let mut path = self.path.to_owned();
self.nonce(&mut path);
self.sign(&mut path);
let mut store = DotStore::new();
store.insert(path);
let c = Causal {
store,
expired: Default::default(),
};
self.augment_array(c)
}
pub fn disable(&self) -> Result<Causal> {
if *self.schema != ArchivedSchema::Flag {
return Err(anyhow!("not a flag"));
}
if !self.can(&self.peer_id, Permission::Write)? {
return Err(anyhow!("unauthorized"));
}
let c = Causal {
store: DotStore::new(),
expired: self.tombstone()?,
};
self.augment_array(c)
}
fn assign(&self, kind: PrimitiveKind) -> Result<(PathBuf, DotStore)> {
if !self.can(&self.peer_id, Permission::Write)? {
return Err(anyhow!("unauthorized"));
}
if *self.schema != ArchivedSchema::Reg(kind) {
return Err(anyhow!("not a Reg<{:?}>", kind));
}
let mut path = self.path.to_owned();
self.nonce(&mut path);
Ok((path, self.tombstone()?))
}
pub fn assign_bool(&self, value: bool) -> Result<Causal> {
let (mut path, expired) = self.assign(PrimitiveKind::Bool)?;
let mut store = DotStore::new();
path.prim_bool(value);
self.sign(&mut path);
store.insert(path);
let c = Causal { store, expired };
self.augment_array(c)
}
pub fn assign_u64(&self, value: u64) -> Result<Causal> {
let (mut path, expired) = self.assign(PrimitiveKind::U64)?;
let mut store = DotStore::new();
path.prim_u64(value);
self.sign(&mut path);
store.insert(path);
let c = Causal { store, expired };
self.augment_array(c)
}
pub fn assign_i64(&self, value: i64) -> Result<Causal> {
let (mut path, expired) = self.assign(PrimitiveKind::I64)?;
let mut store = DotStore::new();
path.prim_i64(value);
self.sign(&mut path);
store.insert(path);
let c = Causal { store, expired };
self.augment_array(c)
}
pub fn assign_str(&self, value: &str) -> Result<Causal> {
let (mut path, expired) = self.assign(PrimitiveKind::Str)?;
let mut store = DotStore::new();
path.prim_str(value);
self.sign(&mut path);
store.insert(path);
let c = Causal { store, expired };
self.augment_array(c)
}
pub fn remove(&self) -> Result<Causal> {
if !self.can(&self.peer_id, Permission::Write)? {
return Err(anyhow!("unauthorized"));
}
let c = Causal {
store: DotStore::new(),
expired: self.tombstone()?,
};
self.augment_array(c)
}
fn say(&self, policy: &Policy) -> Result<Causal> {
if !match &policy {
Policy::Can(_, perm) | Policy::CanIf(_, perm, _) => {
if perm.controllable() {
self.can(&self.peer_id, Permission::Control)?
} else {
self.can(&self.peer_id, Permission::Own)?
}
}
Policy::Revokes(_) => self.can(&self.peer_id, Permission::Control)?,
} {
return Err(anyhow!("unauthorized"));
}
let mut path = self.path.clone();
path.policy(policy);
self.sign(&mut path);
let mut store = DotStore::new();
store.insert(path);
let c = Causal {
store,
expired: DotStore::new(),
};
self.augment_array(c)
}
pub fn say_can(&self, actor: Option<PeerId>, perm: Permission) -> Result<Causal> {
self.say(&Policy::Can(actor.into(), perm))
}
pub fn cond(&self, actor: Actor, perm: Permission) -> Can {
Can::new(actor, perm, self.path.clone())
}
pub fn say_can_if(&self, actor: Actor, perm: Permission, cond: Can) -> Result<Causal> {
self.say(&Policy::CanIf(actor, perm, cond))
}
pub fn revoke(&self, claim: Dot) -> Result<Causal> {
self.say(&Policy::Revokes(claim))
}
pub fn r#move(&mut self, to: usize) -> Result<Causal> {
let array = self.array.pop().context("Not inside an ORArray")?;
array.r#move(self, to)
}
pub fn delete(&mut self) -> Result<Causal> {
let array = self.array.pop().context("Not inside an ORArray")?;
array.delete(self)
}
fn augment_array(&self, mut inner: Causal) -> Result<Causal> {
for a in &self.array {
inner = a.augment_causal(self, inner)?;
}
Ok(inner)
}
}
fn nonce() -> u64 {
let mut nonce = [0; 8];
getrandom::getrandom(&mut nonce).unwrap();
u64::from_le_bytes(nonce)
}
#[derive(Clone, Debug)]
struct ArrayWrapper {
array_path: PathBuf,
pos: Fraction,
uid: u64,
value_path: PathBuf,
meta_path: PathBuf,
}
impl ArrayWrapper {
fn augment_causal(&self, cursor: &Cursor, inner: Causal) -> Result<Causal> {
if cursor
.crdt
.scan_path(self.value_path.as_path())
.next()
.is_some()
{
self.update(cursor, inner)
} else {
self.insert(cursor, inner)
}
}
fn new(cursor: &Cursor, mut ix: usize) -> Result<(Self, PathBuf)> {
let array_path = cursor.path.clone();
let array_value_root = {
let mut p = array_path.clone();
p.prim_str(array_util::ARRAY_VALUES);
p
};
let len = cursor.crdt.scan_path(array_value_root.as_path()).count();
let mut iter = cursor.crdt.scan_path(array_value_root.as_path());
ix = ix.min(len);
let (pos, uid) = if let Some(entry) = iter.nth(ix) {
let p_c = cursor.path.clone();
let data = array_util::ArrayValueEntry::from_path(
Path::new(&entry).strip_prefix(p_c.as_path())?,
)
.context("Reading array data")?;
(data.pos, data.uid)
} else {
let (left, right) = match ix.checked_sub(1) {
Some(s) => {
let p_c = cursor.path.clone();
let mut iter = cursor
.crdt
.scan_path(array_value_root.as_path())
.skip(s)
.map(move |iv| -> anyhow::Result<_> {
let meta = array_util::ArrayValueEntry::from_path(
Path::new(&iv).strip_prefix(p_c.as_path())?,
)?;
Ok(meta.pos)
});
(iter.next(), iter.next())
}
None => {
let p_c = cursor.path.clone();
let mut iter =
cursor
.crdt
.scan_path(array_value_root.as_path())
.map(move |iv| {
let meta = array_util::ArrayValueEntry::from_path(
Path::new(&iv).strip_prefix(p_c.as_path())?,
)?;
Ok(meta.pos)
});
(None, iter.next())
}
};
let left = left.transpose()?.unwrap_or_else(Fraction::zero);
let pos = if let Some(r) = right.transpose()? {
left.mid(&r)
} else {
left.succ()
};
(pos, nonce())
};
let value_path = {
let mut p = array_path.clone();
p.prim_str(array_util::ARRAY_VALUES);
p.position(&pos);
p.prim_u64(uid);
p
};
let meta_path = {
let mut p = array_path.clone();
p.prim_str(array_util::ARRAY_META);
p.prim_u64(uid);
p
};
Ok((
Self {
array_path,
pos,
uid,
value_path: value_path.clone(),
meta_path,
},
value_path,
))
}
pub fn r#move(self, cursor: &Cursor, mut to: usize) -> Result<Causal> {
let new_pos = {
let mut value_path = self.array_path.clone();
value_path.prim_str(array_util::ARRAY_VALUES);
let len = cursor.crdt.scan_path(value_path.as_path()).count();
to = to.min(len);
let (left, right) = match to.checked_sub(1) {
Some(s) => {
let p_c = self.array_path.clone();
let mut iter = cursor.crdt.scan_path(value_path.as_path()).skip(s).map(
move |iv| -> anyhow::Result<_> {
let meta = array_util::ArrayValueEntry::from_path(
Path::new(&iv).strip_prefix(p_c.as_path())?,
)?;
Ok(meta.pos)
},
);
(iter.next(), iter.next())
}
None => {
let p_c = self.array_path.clone();
let mut iter = cursor.crdt.scan_path(value_path.as_path()).map(move |iv| {
let meta = array_util::ArrayValueEntry::from_path(
Path::new(&iv).strip_prefix(p_c.as_path())?,
)?;
Ok(meta.pos)
});
(None, iter.next())
}
};
let left = left.transpose()?.unwrap_or_else(Fraction::zero);
if let Some(right) = right.transpose()? {
left.mid(&right)
} else {
left.succ()
}
};
let existing_meta = cursor
.crdt
.scan_path(self.meta_path.as_path())
.collect::<Vec<_>>();
anyhow::ensure!(!existing_meta.is_empty(), "Value does not exist!");
let mut store = DotStore::new();
let mut expired = DotStore::new();
let move_op = nonce();
for e in existing_meta {
let mut p = Path::new(&e).to_owned();
cursor.sign(&mut p);
let mut meta = self.get_meta_data(p.as_path())?;
expired.insert(p);
meta.last_move = move_op;
let mut path = meta.to_path(self.meta_path.clone());
cursor.sign(&mut path);
store.insert(path);
}
let old = cursor
.crdt
.scan_path(self.value_path.as_path())
.next()
.context("Concurrent access")?;
let mut p = Path::new(&old).to_owned();
let mut v = self.get_value(p.as_path())?;
cursor.sign(&mut p);
expired.insert(p);
v.pos = new_pos;
let mut new_value_path = v.to_path({
let mut p = self.array_path;
p.prim_str(array_util::ARRAY_VALUES);
p
});
cursor.sign(&mut new_value_path);
store.insert(new_value_path);
Ok(Causal { store, expired })
}
fn tombstone(&self, cursor: &Cursor) -> Result<DotStore> {
let mut expired = DotStore::new();
for e in cursor
.crdt
.scan_path(self.value_path.as_path())
.chain(cursor.crdt.scan_path(self.meta_path.as_path()))
{
let mut p = Path::new(&e).to_owned();
cursor.sign(&mut p);
expired.insert(p);
}
Ok(expired)
}
fn update(&self, cursor: &Cursor, mut inner: Causal) -> Result<Causal> {
for e in cursor.crdt.scan_path(cursor.path.as_path()) {
let mut p = Path::new(&e).to_owned();
cursor.sign(&mut p);
inner.expired.insert(p);
}
let mut last_move = None;
for e in cursor.crdt.scan_path(self.meta_path.as_path()) {
let mut p = Path::new(&e).to_owned();
if last_move.is_none() {
last_move.replace(self.get_meta_data(p.as_path())?.last_move);
}
cursor.sign(&mut p);
inner.expired.insert(p);
}
let meta_entry = ArrayMetaEntry::new(
self.uid,
nonce(),
last_move.context("No metadata for value entry found")?,
self.pos.clone(),
);
let mut p = meta_entry.to_path(self.meta_path.clone());
cursor.sign(&mut p);
inner.store.insert(p);
Ok(inner)
}
fn insert(&self, cursor: &Cursor, mut inner: Causal) -> Result<Causal> {
let meta_entry = ArrayMetaEntry::new(self.uid, nonce(), nonce(), self.pos.clone());
let mut p = meta_entry.to_path(self.meta_path.clone());
cursor.sign(&mut p);
inner.store.insert(p);
Ok(inner)
}
fn delete(&self, cursor: &Cursor) -> Result<Causal> {
Ok(Causal {
expired: self.tombstone(cursor)?,
store: Default::default(),
})
}
fn get_meta_data(&self, path: Path) -> Result<array_util::ArrayMetaEntry> {
array_util::ArrayMetaEntry::from_path(path.strip_prefix(self.array_path.as_path())?)
}
fn get_value(&self, path: Path<'_>) -> Result<array_util::ArrayValueEntry> {
array_util::ArrayValueEntry::from_path(path.strip_prefix(self.array_path.as_path())?)
}
}
mod array_util {
use super::*;
use crate::Segment;
use anyhow::Context;
pub(crate) const ARRAY_VALUES: &str = "VALUES";
pub(crate) const ARRAY_META: &str = "META";
pub(crate) struct ArrayValueEntry {
pub(crate) pos: Fraction,
pub(crate) uid: u64,
pub(crate) value: Vec<Segment>,
}
impl ArrayValueEntry {
pub(crate) fn from_path(path: Path<'_>) -> Result<ArrayValueEntry> {
let mut path = path.into_iter();
anyhow::ensure!(
path.next()
.context("Unexpected layout")?
.prim_str()
.context("Unexpected layout")?
== ARRAY_VALUES,
"Unexpected layout"
);
let pos = path
.next()
.context("Unexpected layout")?
.position()
.context("Unexpected layout")?;
let uid = path
.next()
.context("Unexpected layout")?
.prim_u64()
.context("Unexpected layout")?;
path.next();
let mut value = path.collect::<Vec<_>>();
anyhow::ensure!(
matches!(value.pop(), Some(Segment::Sig(_))),
"Unexpected layout"
);
anyhow::ensure!(
matches!(value.pop(), Some(Segment::Peer(_))),
"Unexpected layout"
);
Ok(Self { uid, pos, value })
}
pub(crate) fn to_path(&self, mut base: PathBuf) -> PathBuf {
base.position(&self.pos);
base.prim_u64(self.uid);
for s in &self.value {
base.push_segment(s.clone());
}
base
}
}
#[allow(dead_code)]
pub(crate) struct ArrayMetaEntry {
pub(crate) last_update: u64,
pub(crate) last_move: u64,
pub(crate) uid: u64,
pub(crate) pos: Fraction,
}
impl ArrayMetaEntry {
pub(crate) fn new(uid: u64, last_update: u64, last_move: u64, pos: Fraction) -> Self {
Self {
uid,
last_update,
last_move,
pos,
}
}
pub(crate) fn from_path(path: Path) -> Result<Self> {
let mut path = path.into_iter();
anyhow::ensure!(
path.next()
.context("Unexpected layout")?
.prim_str()
.context("Unexpected layout")?
== ARRAY_META,
"Unexpected layout"
);
let uid = path
.next()
.context("Unexpected layout")?
.prim_u64()
.context("Unexpected layout")?;
let last_update = path
.next()
.context("Unexpected layout")?
.prim_u64()
.context("Unexpected layout")?;
let last_move = path
.next()
.context("Unexpected layout")?
.prim_u64()
.context("Unexpected layout")?;
let pos = path
.next()
.context("Unexpected layout")?
.position()
.context("Unexpected layout")?;
Ok(Self {
last_update,
last_move,
pos,
uid,
})
}
pub(crate) fn to_path(&self, mut base: PathBuf) -> PathBuf {
base.prim_u64(self.last_update);
base.prim_u64(self.last_move);
base.position(&self.pos);
base.prim_u64(nonce());
base
}
}
}