1use std::collections::HashMap;
13use std::collections::HashSet;
14use std::fmt;
15
16use bloomfilter::Bloom;
17use zeroize::Zeroize;
18
19use crate::errors::*;
20#[allow(unused_imports)]
21use crate::log::*;
22use crate::object::*;
23use crate::store::Store;
24use crate::types::*;
25use crate::utils::encrypt_in_place;
26
27impl BranchV0 {
28 pub fn new(
29 id: PubKey,
30 repo: ObjectRef,
31 root_branch_readcap_id: ObjectId,
32 topic_priv: PrivKey,
33 metadata: Vec<u8>,
34 ) -> BranchV0 {
35 let topic_privkey: Vec<u8> = vec![];
36 let topic = topic_priv.to_pub();
38 BranchV0 {
39 id,
40 content_type: BranchContentType::None,
41 repo,
42 root_branch_readcap_id,
43 topic,
44 topic_privkey,
45 pulled_from: vec![],
46 metadata,
47 }
48 }
49}
50
51#[allow(dead_code)]
52#[derive(Debug)]
53pub struct DagNode {
54 pub future: HashSet<ObjectId>,
55 pub past: HashSet<ObjectId>,
56}
57
58#[allow(dead_code)]
59struct Dag<'a>(&'a HashMap<Digest, DagNode>);
60
61impl fmt::Display for DagNode {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 for fu in self.future.iter() {
64 write!(f, "{} ", fu)?;
65 }
66 Ok(())
67 }
68}
69
70impl<'a> fmt::Display for Dag<'a> {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 for node in self.0.iter() {
73 writeln!(f, "ID: {} FUTURES: {}", node.0, node.1)?;
74 }
75 Ok(())
76 }
77}
78
79impl DagNode {
80 fn new() -> Self {
81 Self {
82 future: HashSet::new(),
83 past: HashSet::new(),
84 }
85 }
86 fn collapse(
87 id: &ObjectId,
88 dag: &HashMap<ObjectId, DagNode>,
89 dag_ids: &HashSet<ObjectId>,
90 already_in: &mut HashSet<ObjectId>,
91 ) -> Vec<ObjectId> {
92 let this = dag.get(id).unwrap();
93 let intersec = this
94 .past
95 .intersection(dag_ids)
96 .cloned()
97 .collect::<HashSet<ObjectId>>();
98 if intersec.len() > 1 && !intersec.is_subset(already_in) {
99 vec![]
102 } else {
103 let mut res = vec![*id];
104 already_in.insert(*id);
105 for child in this.future.iter() {
106 log_debug!("child of {} : {}", id, child);
107 res.append(&mut Self::collapse(child, dag, dag_ids, already_in));
108 }
109 res
110 }
111 }
112}
113
114impl Branch {
115 fn encrypt_topic_priv_key(
121 mut plaintext: Vec<u8>,
122 topic_id: TopicId,
123 branch_id: BranchId,
124 repo_write_cap_secret: &RepoWriteCapSecret,
125 ) -> Vec<u8> {
126 let repo_write_cap_secret = serde_bare::to_vec(repo_write_cap_secret).unwrap();
127 let topic_id = serde_bare::to_vec(&topic_id).unwrap();
128 let branch_id = serde_bare::to_vec(&branch_id).unwrap();
129 let mut key_material = [repo_write_cap_secret, topic_id, branch_id].concat();
130 let mut key: [u8; 32] = blake3::derive_key(
131 "NextGraph Branch WriteCap Secret BLAKE3 key",
132 key_material.as_slice(),
133 );
134 encrypt_in_place(&mut plaintext, key, [0; 12]);
135 key.zeroize();
136 key_material.zeroize();
137 plaintext
138 }
139
140 pub fn encrypt_branch_write_cap_secret(
141 privkey: &BranchWriteCapSecret,
142 topic_id: TopicId,
143 branch_id: BranchId,
144 repo_write_cap_secret: &RepoWriteCapSecret,
145 ) -> Vec<u8> {
146 let plaintext = serde_bare::to_vec(privkey).unwrap();
147 Branch::encrypt_topic_priv_key(plaintext, topic_id, branch_id, repo_write_cap_secret)
148 }
149
150 pub fn decrypt_branch_write_cap_secret(
151 ciphertext: Vec<u8>,
152 topic_id: TopicId,
153 branch_id: BranchId,
154 repo_write_cap_secret: &RepoWriteCapSecret,
155 ) -> Result<BranchWriteCapSecret, NgError> {
156 let plaintext =
157 Branch::encrypt_topic_priv_key(ciphertext, topic_id, branch_id, repo_write_cap_secret);
158 Ok(serde_bare::from_slice(&plaintext)?)
159 }
160
161 pub fn new(
162 id: PubKey,
163 repo: ObjectRef,
164 root_branch_readcap_id: ObjectId,
165 topic_priv: PrivKey,
166 metadata: Vec<u8>,
167 ) -> Branch {
168 Branch::V0(BranchV0::new(
169 id,
170 repo,
171 root_branch_readcap_id,
172 topic_priv,
173 metadata,
174 ))
175 }
176
177 pub fn load_causal_past(
183 cobj: &Object,
184 store: &Store,
185 theirs: &HashSet<ObjectId>,
186 visited: &mut HashMap<ObjectId, DagNode>,
187 missing: &mut Option<&mut HashSet<ObjectId>>,
188 future: Option<ObjectId>,
189 theirs_found: &mut Option<&mut HashSet<ObjectId>>,
190 theirs_filter: &Option<Bloom<ObjectId>>,
191 ) -> Result<(), ObjectParseError> {
192 let id = cobj.id();
193
194 let found_in_filter = if let Some(filter) = theirs_filter {
198 filter.check(&id)
199 } else {
200 false
201 };
202
203 if !found_in_filter && !theirs.contains(&id) {
204 if let Some(past) = visited.get_mut(&id) {
205 if let Some(f) = future {
207 past.future.insert(f);
208 }
209 } else {
210 let mut new_node_to_insert = DagNode::new();
211 if let Some(f) = future {
212 new_node_to_insert.future.insert(f);
213 }
214 let pasts = cobj.acks_and_nacks();
215 new_node_to_insert.past.extend(pasts.iter().cloned());
216 visited.insert(id, new_node_to_insert);
217 for past_id in pasts {
218 match Object::load(past_id, None, store) {
219 Ok(o) => {
220 Self::load_causal_past(
221 &o,
222 store,
223 theirs,
224 visited,
225 missing,
226 Some(id),
227 theirs_found,
228 theirs_filter,
229 )?;
230 }
231 Err(ObjectParseError::MissingBlocks(blocks)) => {
232 missing.as_mut().map(|m| m.extend(blocks));
233 }
234 Err(e) => return Err(e),
235 }
236 }
237 }
238 } else if theirs_found.is_some() {
239 theirs_found.as_mut().unwrap().insert(id);
240 }
241 Ok(())
242 }
243
244 pub fn sync_req(
254 target_heads: impl Iterator<Item = ObjectId>,
255 known_heads: &[ObjectId],
256 known_commits: &Option<BloomFilter>,
257 store: &Store,
258 ) -> Result<Vec<ObjectId>, ObjectParseError> {
259 let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new();
261
262 for id in known_heads {
264 if let Ok(cobj) = Object::load(*id, None, store) {
265 Self::load_causal_past(
266 &cobj,
267 store,
268 &HashSet::new(),
269 &mut theirs,
270 &mut None,
271 None,
272 &mut None,
273 &None,
274 )?;
275 }
276 }
278
279 let mut visited = HashMap::new();
282
283 let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect();
284
285 let filter = if let Some(filter) = known_commits.as_ref() {
286 Some(
287 serde_bare::from_slice(filter.filter())
288 .map_err(|_| ObjectParseError::FilterDeserializationError)?,
289 )
290 } else {
291 None
292 };
293
294 for id in target_heads {
297 if let Ok(cobj) = Object::load(id, None, store) {
298 Self::load_causal_past(
299 &cobj,
300 store,
301 &theirs,
302 &mut visited,
303 &mut None,
304 None,
305 &mut None,
306 &filter,
307 )?;
308 }
309 }
311
312 let mut next_generations = HashSet::new();
316 for (_, node) in visited.iter() {
317 for future in node.future.iter() {
318 next_generations.insert(future);
319 }
320 }
321 let all = HashSet::from_iter(visited.keys());
322 let first_generation = all.difference(&next_generations);
323
324 let mut already_in: HashSet<ObjectId> = HashSet::new();
325
326 let sub_dag_to_send_size = visited.len();
327 let mut result = Vec::with_capacity(sub_dag_to_send_size);
328 let dag_ids: HashSet<ObjectId> = visited.keys().cloned().collect();
329 for first in first_generation {
330 result.append(&mut DagNode::collapse(
331 first,
332 &visited,
333 &dag_ids,
334 &mut already_in,
335 ));
336 }
337 if result.len() != sub_dag_to_send_size || already_in.len() != sub_dag_to_send_size {
344 return Err(ObjectParseError::MalformedDag);
345 }
346
347 #[cfg(debug_assertions)]
348 for _res in result.iter() {
349 log_debug!("sending missing commit {}", _res);
350 }
351
352 Ok(result)
353 }
354}
355
356#[allow(unused_imports)]
357#[cfg(test)]
358mod test {
359
360 use crate::branch::*;
363
364 use crate::repo::Repo;
365
366 use crate::log::*;
367 use crate::store::Store;
368 use crate::utils::*;
369
370 #[test]
371 pub fn test_branch() {
372 fn add_obj(
373 content: ObjectContentV0,
374 header: Option<CommitHeader>,
375 store: &Store,
376 ) -> ObjectRef {
377 let max_object_size = 4000;
378 let mut obj = Object::new(ObjectContent::V0(content), header, max_object_size, store);
379 obj.save_in_test(store).unwrap();
380 obj.reference().unwrap()
381 }
382
383 fn add_commit(
384 branch: BranchId,
385 author_privkey: PrivKey,
386 author_pubkey: PubKey,
387 deps: Vec<ObjectRef>,
388 acks: Vec<ObjectRef>,
389 body_ref: ObjectRef,
390 store: &Store,
391 ) -> ObjectRef {
392 let header = CommitHeader::new_with_deps_and_acks(
393 deps.iter().map(|r| r.id).collect(),
394 acks.iter().map(|r| r.id).collect(),
395 );
396
397 let overlay = store.get_store_repo().overlay_id_for_read_purpose();
398
399 let obj_ref = ObjectRef {
400 id: ObjectId::Blake3Digest32([1; 32]),
401 key: SymKey::ChaCha20Key([2; 32]),
402 };
403 let refs = vec![obj_ref];
404 let metadata = vec![5u8; 55];
405
406 let commit = CommitV0::new(
407 &author_privkey,
408 &author_pubkey,
409 overlay,
410 branch,
411 QuorumType::NoSigning,
412 deps,
413 vec![],
414 acks,
415 vec![],
416 refs,
417 vec![],
418 metadata,
419 body_ref,
420 )
421 .unwrap();
422 add_obj(ObjectContentV0::Commit(Commit::V0(commit)), header, store)
424 }
425
426 fn add_body_branch(branch: BranchV0, store: &Store) -> ObjectRef {
427 let body: CommitBodyV0 = CommitBodyV0::Branch(Branch::V0(branch));
428 add_obj(
430 ObjectContentV0::CommitBody(CommitBody::V0(body)),
431 None,
432 store,
433 )
434 }
435
436 fn add_body_trans(header: Option<CommitHeader>, content: u8, store: &Store) -> ObjectRef {
437 let content = [content; 777].to_vec();
438 let body = CommitBodyV0::AsyncTransaction(Transaction::V0(content));
439 add_obj(
441 ObjectContentV0::CommitBody(CommitBody::V0(body)),
442 header,
443 store,
444 )
445 }
446
447 let (repo_privkey, repo_pubkey) = generate_keypair();
450 let store = Store::dummy_with_key(repo_pubkey);
451
452 let (_, branch_pubkey) = generate_keypair();
455
456 let (member_privkey, member_pubkey) = generate_keypair();
457
458 let metadata = [66u8; 64].to_vec();
459
460 let repo = Repo::new_with_member(
461 &repo_pubkey,
462 &member_pubkey,
463 &[PermissionV0::WriteAsync],
464 store,
465 );
466
467 let repo_ref = ObjectRef {
468 id: ObjectId::Blake3Digest32([1; 32]),
469 key: SymKey::ChaCha20Key([2; 32]),
470 };
471
472 let root_branch_def_id = ObjectId::Blake3Digest32([1; 32]);
473
474 let branch = BranchV0::new(
475 branch_pubkey,
476 repo_ref,
477 root_branch_def_id,
478 repo_privkey,
479 metadata,
480 );
481 fn print_branch() {
484 log_debug!("branch deps/acks:");
485 log_debug!("");
486 log_debug!(" br");
487 log_debug!(" / \\");
488 log_debug!(" t1 t2");
489 log_debug!(" \\ /");
490 log_debug!(" t4");
491 log_debug!(" |");
492 log_debug!(" t5");
493 log_debug!("");
494 }
495
496 print_branch();
497
498 let branch_body = add_body_branch(branch.clone(), &repo.store);
501
502 let trans_body = add_body_trans(None, 8, &repo.store);
503 let trans_body2 = add_body_trans(None, 9, &repo.store);
504
505 let br = add_commit(
508 branch_pubkey,
509 member_privkey.clone(),
510 member_pubkey,
511 vec![],
512 vec![],
513 branch_body.clone(),
514 &repo.store,
515 );
516 log_debug!(">> br {}", br.id);
517
518 let t1 = add_commit(
519 branch_pubkey,
520 member_privkey.clone(),
521 member_pubkey,
522 vec![],
523 vec![br.clone()],
524 trans_body.clone(),
525 &repo.store,
526 );
527 log_debug!(">> t1 {}", t1.id);
528
529 let t2 = add_commit(
530 branch_pubkey,
531 member_privkey.clone(),
532 member_pubkey,
533 vec![],
534 vec![br.clone()],
535 trans_body2.clone(),
536 &repo.store,
537 );
538 log_debug!(">> t2 {}", t2.id);
539
540 let t4 = add_commit(
541 branch_pubkey,
542 member_privkey.clone(),
543 member_pubkey,
544 vec![],
545 vec![t1.clone(), t2.clone()],
546 trans_body.clone(),
547 &repo.store,
548 );
549 log_debug!(">> t4 {}", t4.id);
550
551 let t5 = add_commit(
552 branch_pubkey,
553 member_privkey.clone(),
554 member_pubkey,
555 vec![],
556 vec![t4.clone()],
557 trans_body.clone(),
558 &repo.store,
559 );
560 log_debug!(">> t5 {}", t5.id);
561
562 let c5 = Commit::load(t5.clone(), &repo.store, true).unwrap();
563 c5.verify(&repo).unwrap();
564
565 let ids = Branch::sync_req([t5.id].into_iter(), &[t1.id], &None, &repo.store).unwrap();
578
579 assert_eq!(ids.len(), 3);
580 assert_eq!(ids, [t2.id, t4.id, t5.id]);
581 }
582}