use automerge::{AutomergeError, Change, ChangeHash, ObjId};
use std::collections::{HashMap, HashSet, VecDeque};
use super::{
read_child_document_url_and_secret, read_child_document_url_and_secrets, save_child_document,
AutomergeDoc,
};
use crate::{
common::{
cipher::{decode_doc_url, decode_document_secret_bytes, DecodedDocUrl},
entry::{split_change_into_entries, Entry, EntryContent, ShrunkEntries},
},
DocumentId, FeedDiscoveryKey, PeermergeError,
};
#[derive(Debug)]
pub(crate) struct ApplyEntriesFeedChange {
pub(crate) length: u64,
}
impl ApplyEntriesFeedChange {
pub(crate) fn new(length: u64) -> Self {
Self { length }
}
pub(crate) fn set_length(&mut self, length: u64) {
self.length = length;
}
}
#[derive(Debug)]
pub(crate) struct UnappliedEntries {
data: HashMap<FeedDiscoveryKey, (u64, VecDeque<Entry>, Vec<ObjId>)>,
pub(crate) reserved_ids: HashSet<ObjId>,
}
impl UnappliedEntries {
pub(crate) fn new() -> Self {
Self {
data: HashMap::new(),
reserved_ids: HashSet::new(),
}
}
pub(crate) fn add(
&mut self,
discovery_key: &FeedDiscoveryKey,
length: u64,
entry: Entry,
reserved: Vec<ObjId>,
) {
if let Some(value) = self.data.get_mut(discovery_key) {
value.0 = length;
value.1.push_back(entry);
value.1.len() - 1
} else {
self.data
.insert(*discovery_key, (length, VecDeque::from([entry]), reserved));
0
};
}
pub(crate) fn consolidate(
&mut self,
meta_automerge_doc: &mut AutomergeDoc,
user_automerge_doc: &mut AutomergeDoc,
meta_changes_to_apply: &mut Vec<Change>,
user_changes_to_apply: &mut Vec<Change>,
result: &mut HashMap<[u8; 32], ApplyEntriesFeedChange>,
) {
let mut meta_hashes: HashSet<ChangeHash> = meta_changes_to_apply
.iter()
.map(|change| change.hash())
.collect();
let mut user_hashes: HashSet<ChangeHash> = user_changes_to_apply
.iter()
.map(|change| change.hash())
.collect();
loop {
let mut changed = false;
for kv in self.data.iter_mut() {
let discovery_key = kv.0;
let value = kv.1;
if value
.2
.iter()
.any(|reserved_obj| self.reserved_ids.contains(reserved_obj))
{
continue;
} else if !value.2.is_empty() {
value.2 = vec![];
}
let original_start_index = value.0
- value
.1
.iter()
.fold(0, |acc, entry| acc + 1 + entry.part_count)
as u64;
let mut new_length = original_start_index;
for entry in value.1.iter() {
match &entry.content {
EntryContent::Change { meta, change, .. } => {
let change = change.as_ref().unwrap();
if change.deps().iter().all(|dep| {
if *meta {
if meta_hashes.contains(dep) {
true
} else if meta_automerge_doc.get_change_by_hash(dep).is_some() {
meta_hashes.insert(*dep);
true
} else {
false
}
} else {
if user_hashes.contains(dep) {
true
} else if user_automerge_doc.get_change_by_hash(dep).is_some() {
user_hashes.insert(*dep);
true
} else {
false
}
}
}) {
new_length += 1 + entry.part_count as u64;
if *meta {
meta_changes_to_apply.push(*change.clone());
meta_hashes.insert(change.hash());
} else {
user_changes_to_apply.push(*change.clone());
user_hashes.insert(change.hash());
}
if let Some(result_value) = result.get_mut(discovery_key) {
result_value.set_length(new_length);
} else {
result.insert(
*discovery_key,
ApplyEntriesFeedChange::new(new_length),
);
}
changed = true;
} else {
break;
}
}
EntryContent::InitPeer {
meta_doc_data,
user_doc_data,
..
} => {
let mut changed_meta_automerge_doc =
AutomergeDoc::load(meta_doc_data).unwrap();
meta_automerge_doc
.merge(&mut changed_meta_automerge_doc)
.unwrap();
if let Some(user_doc_data) = user_doc_data {
let mut changed_user_automerge_doc =
AutomergeDoc::load(user_doc_data).unwrap();
user_automerge_doc
.merge(&mut changed_user_automerge_doc)
.unwrap();
}
new_length += 1 + entry.part_count as u64;
if let Some(result_value) = result.get_mut(discovery_key) {
result_value.set_length(new_length);
} else {
result.insert(
*discovery_key,
ApplyEntriesFeedChange::new(new_length),
);
}
changed = true;
}
_ => panic!("Unexpected entry {entry:?}"),
}
}
for _ in original_start_index..new_length {
value.1.pop_front();
}
}
if !changed {
self.data.retain(|_, v| !v.1.is_empty());
break;
}
}
}
}
#[derive(Debug)]
pub(crate) struct DocsChangeResult {
pub(crate) meta_changed: bool,
pub(crate) user_changed: bool,
}
pub(crate) fn apply_entries_autocommit(
meta_automerge_doc: &mut AutomergeDoc,
user_automerge_doc: &mut AutomergeDoc,
discovery_key: &FeedDiscoveryKey,
contiguous_length: u64,
shrunk_entries: ShrunkEntries,
unapplied_entries: &mut UnappliedEntries,
) -> Result<HashMap<[u8; 32], ApplyEntriesFeedChange>, PeermergeError> {
let mut result: HashMap<[u8; 32], ApplyEntriesFeedChange> = HashMap::new();
let len = shrunk_entries.entries.len() as u64;
let mut length = contiguous_length - len - shrunk_entries.shrunk_count;
for entry in shrunk_entries.entries.into_iter() {
length += 1 + entry.part_count as u64;
match entry.content {
EntryContent::Change {
meta, ref change, ..
} => {
let change = change.as_ref().unwrap();
let reserved: Vec<ObjId> = if !meta && !unapplied_entries.reserved_ids.is_empty() {
change
.decode()
.operations
.iter()
.filter_map(|op| {
let obj_id_as_string: &str = &op.obj.to_string();
if let Ok((obj, _)) = user_automerge_doc.import(obj_id_as_string) {
if unapplied_entries.reserved_ids.contains(&obj) {
Some(obj)
} else {
None
}
} else {
None
}
})
.collect()
} else {
vec![]
};
if reserved.is_empty()
&& change.deps().iter().all(|dep| {
if meta {
meta_automerge_doc.get_change_by_hash(dep).is_some()
} else {
user_automerge_doc.get_change_by_hash(dep).is_some()
}
})
{
if meta {
meta_automerge_doc.apply_changes([*change.clone()])?;
} else {
user_automerge_doc.apply_changes([*change.clone()])?;
}
if let Some(result_value) = result.get_mut(discovery_key) {
result_value.set_length(length);
} else {
result.insert(*discovery_key, ApplyEntriesFeedChange::new(length));
}
} else {
unapplied_entries.add(discovery_key, length, entry, reserved);
};
}
EntryContent::InitPeer {
meta_doc_data,
user_doc_data,
..
} => {
let mut changed_meta_automerge_doc = AutomergeDoc::load(&meta_doc_data).unwrap();
meta_automerge_doc
.merge(&mut changed_meta_automerge_doc)
.unwrap();
if let Some(user_doc_data) = user_doc_data {
let mut changed_user_automerge_doc =
AutomergeDoc::load(&user_doc_data).unwrap();
user_automerge_doc
.merge(&mut changed_user_automerge_doc)
.unwrap();
}
if let Some(result_value) = result.get_mut(discovery_key) {
result_value.set_length(length);
} else {
result.insert(*discovery_key, ApplyEntriesFeedChange::new(length));
}
}
_ => {
panic!("Unexpected entry {entry:?}");
}
}
}
let mut meta_changes_to_apply: Vec<Change> = vec![];
let mut user_changes_to_apply: Vec<Change> = vec![];
unapplied_entries.consolidate(
meta_automerge_doc,
user_automerge_doc,
&mut meta_changes_to_apply,
&mut user_changes_to_apply,
&mut result,
);
if !meta_changes_to_apply.is_empty() {
meta_automerge_doc.apply_changes(meta_changes_to_apply)?;
}
if !user_changes_to_apply.is_empty() {
user_automerge_doc.apply_changes(user_changes_to_apply)?;
}
Ok(result)
}
pub(crate) fn apply_unapplied_entries_autocommit(
meta_automerge_doc: &mut AutomergeDoc,
user_automerge_doc: &mut AutomergeDoc,
unapplied_entries: &mut UnappliedEntries,
) -> Result<HashMap<[u8; 32], ApplyEntriesFeedChange>, PeermergeError> {
let mut result: HashMap<[u8; 32], ApplyEntriesFeedChange> = HashMap::new();
let mut meta_changes_to_apply: Vec<Change> = vec![];
let mut user_changes_to_apply: Vec<Change> = vec![];
unapplied_entries.consolidate(
meta_automerge_doc,
user_automerge_doc,
&mut meta_changes_to_apply,
&mut user_changes_to_apply,
&mut result,
);
if !user_changes_to_apply.is_empty() {
user_automerge_doc.apply_changes(user_changes_to_apply)?;
}
Ok(result)
}
pub(crate) fn transact_mut_autocommit<F, O>(
meta: bool,
automerge_doc: &mut AutomergeDoc,
max_entry_data_size_bytes: usize,
cb: F,
) -> Result<(Vec<Entry>, O), PeermergeError>
where
F: FnOnce(&mut AutomergeDoc) -> Result<O, AutomergeError>,
{
let result = cb(automerge_doc).unwrap();
let entries: Vec<Entry> = automerge_doc
.get_last_local_change()
.map(|change| split_change_into_entries(meta, change.clone(), max_entry_data_size_bytes))
.unwrap_or_default();
Ok((entries, result))
}
pub(crate) fn transact_autocommit<F, O>(
automerge_doc: &AutomergeDoc,
cb: F,
) -> Result<O, PeermergeError>
where
F: FnOnce(&AutomergeDoc) -> Result<O, AutomergeError>,
{
let result = cb(automerge_doc).unwrap();
Ok(result)
}
pub(crate) fn add_child_document(
meta_automerge_doc: &mut AutomergeDoc,
child_document_id: DocumentId,
child_document_url: &str,
child_document_secret: Vec<u8>,
max_entry_data_size_bytes: usize,
) -> Result<Vec<Entry>, PeermergeError> {
save_child_document(
meta_automerge_doc,
child_document_id,
child_document_url,
child_document_secret,
)?;
let entries: Vec<Entry> = meta_automerge_doc
.get_last_local_change()
.map(|change| split_change_into_entries(true, change.clone(), max_entry_data_size_bytes))
.unwrap_or_default();
meta_automerge_doc.update_diff_cursor();
Ok(entries)
}
pub(crate) fn get_child_document_decoded_url(
meta_automerge_doc: &AutomergeDoc,
child_document_id: DocumentId,
) -> Option<DecodedDocUrl> {
read_child_document_url_and_secret(meta_automerge_doc, child_document_id).map(
|(document_url, document_secret_bytes)| {
let document_secret = decode_document_secret_bytes(&document_secret_bytes)
.expect("Stored document secret should not be invalid");
decode_doc_url(&document_url, &Some(document_secret))
.expect("Stored document URL should not be invalid")
},
)
}
pub(crate) fn get_child_document_decoded_urls(
meta_automerge_doc: &AutomergeDoc,
) -> Vec<DecodedDocUrl> {
read_child_document_url_and_secrets(meta_automerge_doc)
.into_iter()
.map(|(document_url, document_secret_bytes)| {
let document_secret = decode_document_secret_bytes(&document_secret_bytes)
.expect("Stored document secret should not be invalid");
decode_doc_url(&document_url, &Some(document_secret))
.expect("Stored document URL should not be invalid")
})
.collect()
}
#[cfg(test)]
mod tests {
use automerge::{transaction::Transactable, ObjType, Prop, ReadDoc, ScalarValue, ROOT};
use super::*;
use crate::{
common::{
constants::DEFAULT_MAX_ENTRY_DATA_SIZE_BYTES, entry::shrink_entries,
keys::generate_keys,
},
crdt::{init_automerge_doc_from_data, init_automerge_docs},
uuid::Uuid,
};
fn assert_int_value(doc: &AutomergeDoc, key: &ObjId, prop: &str, expected: i64) {
let value = doc.get(key, prop).unwrap().unwrap();
let actual = value.0.to_scalar().unwrap();
assert_eq!(actual, &ScalarValue::Int(expected));
}
fn put_object_autocommit<O: AsRef<ObjId>, P: Into<Prop>>(
automerge_doc: &mut AutomergeDoc,
obj: O,
prop: P,
object: ObjType,
) -> Result<(Entry, ObjId), PeermergeError> {
let id = automerge_doc.put_object(obj, prop, object)?;
let mut change = automerge_doc.get_last_local_change().unwrap().clone();
Ok((Entry::new_change(false, 0, change.bytes().to_vec()), id))
}
fn put_scalar_autocommit<O: AsRef<ObjId>, P: Into<Prop>, V: Into<ScalarValue>>(
automerge_doc: &mut AutomergeDoc,
obj: O,
prop: P,
value: V,
) -> Result<Entry, PeermergeError> {
automerge_doc.put(obj, prop, value)?;
let mut change = automerge_doc.get_last_local_change().unwrap().clone();
Ok(Entry::new_change(false, 0, change.bytes().to_vec()))
}
#[test]
fn automerge_edit_apply_entries() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let peer_id = uuid.as_bytes();
let int_prop = "number";
let int_value = 1;
let (_, doc_discovery_key) = generate_keys();
let (_, peer_1_discovery_key) = generate_keys();
let (_, peer_2_discovery_key) = generate_keys();
let (result, _, _) = init_automerge_docs(
doc_discovery_key,
peer_id,
false,
DEFAULT_MAX_ENTRY_DATA_SIZE_BYTES,
|tx| tx.put(ROOT, "version", 1),
)
.unwrap();
let mut meta_doc = result.meta_automerge_doc;
let mut user_doc = result.user_automerge_doc;
let (entry_1, key_1) =
put_object_autocommit(&mut user_doc, ROOT, "level_1", automerge::ObjType::Map)?;
let (entry_2, key_2) =
put_object_autocommit(&mut user_doc, &key_1, "level_2", automerge::ObjType::Map)?;
let (entry_3, key_3) =
put_object_autocommit(&mut user_doc, &key_2, "level_3", automerge::ObjType::Map)?;
let (entry_4, key_4) =
put_object_autocommit(&mut user_doc, &key_3, "level_4", automerge::ObjType::Map)?;
let (entry_5, key_5) =
put_object_autocommit(&mut user_doc, &key_4, "level_5", automerge::ObjType::Map)?;
let entry_scalar = put_scalar_autocommit(&mut user_doc, &key_5, int_prop, int_value)?;
let mut unapplied_entries = UnappliedEntries::new();
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&doc_discovery_key,
7,
shrink_entries(vec![
entry_1.clone(),
entry_2.clone(),
entry_3.clone(),
entry_4.clone(),
entry_5.clone(),
entry_scalar.clone(),
]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 0);
assert_int_value(&user_doc, &key_5, int_prop, int_value);
user_doc = init_automerge_doc_from_data(peer_id, &result.user_doc_data);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&doc_discovery_key,
3,
shrink_entries(vec![entry_1.clone(), entry_2.clone()]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 0);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&doc_discovery_key,
5,
shrink_entries(vec![entry_3.clone(), entry_4.clone()]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 0);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&doc_discovery_key,
7,
shrink_entries(vec![entry_5.clone(), entry_scalar.clone()]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 0);
assert_int_value(&user_doc, &key_5, int_prop, int_value);
user_doc = init_automerge_doc_from_data(peer_id, &result.user_doc_data);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&doc_discovery_key,
6,
shrink_entries(vec![
entry_2.clone(),
entry_3.clone(),
entry_4.clone(),
entry_5.clone(),
entry_scalar.clone(),
]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 1);
assert_eq!(
unapplied_entries
.data
.get(&doc_discovery_key)
.unwrap()
.1
.len(),
5
);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&peer_1_discovery_key,
1,
shrink_entries(vec![entry_1.clone()]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 0);
assert_int_value(&user_doc, &key_5, int_prop, int_value);
user_doc = init_automerge_doc_from_data(peer_id, &result.user_doc_data);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&peer_1_discovery_key,
4,
shrink_entries(vec![entry_2, entry_4, entry_scalar]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 1);
assert_eq!(
unapplied_entries
.data
.get(&peer_1_discovery_key)
.unwrap()
.1
.len(),
3
);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&peer_2_discovery_key,
3,
shrink_entries(vec![entry_3, entry_5]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 2);
assert_eq!(
unapplied_entries
.data
.get(&peer_1_discovery_key)
.unwrap()
.1
.len(),
3
);
assert_eq!(
unapplied_entries
.data
.get(&peer_2_discovery_key)
.unwrap()
.1
.len(),
2
);
apply_entries_autocommit(
&mut meta_doc,
&mut user_doc,
&doc_discovery_key,
2,
shrink_entries(vec![entry_1]),
&mut unapplied_entries,
)?;
assert_eq!(unapplied_entries.data.len(), 0);
assert_int_value(&user_doc, &key_5, int_prop, int_value);
Ok(())
}
}