1use std::collections::HashMap;
13use std::collections::HashSet;
14use std::fmt;
15
16use sbbf_rs_safe::Filter;
17use zeroize::Zeroize;
18
19use crate::errors::*;
20#[allow(unused_imports)]
21use crate::log::*;
22use crate::object::*;
23use crate::store::Store;
24use crate::types::*;
25use crate::utils::encrypt_in_place;
26
27impl BranchV0 {
28 pub fn new(
29 id: PubKey,
30 repo: ObjectRef,
31 root_branch_readcap_id: ObjectId,
32 topic_priv: PrivKey,
33 metadata: Vec<u8>,
34 ) -> BranchV0 {
35 let topic_privkey: Vec<u8> = vec![];
36 let topic = topic_priv.to_pub();
38 BranchV0 {
39 id,
40 crdt: BranchCrdt::None,
41 repo,
42 root_branch_readcap_id,
43 topic,
44 topic_privkey,
45 pulled_from: vec![],
46 metadata,
47 }
48 }
49}
50
51#[allow(dead_code)]
52#[derive(Debug)]
53pub struct DagNode {
54 pub future: HashSet<ObjectId>,
55 pub past: HashSet<ObjectId>,
56}
57
58#[allow(dead_code)]
59struct Dag<'a>(&'a HashMap<Digest, DagNode>);
60
61impl fmt::Display for DagNode {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 for fu in self.future.iter() {
64 write!(f, "{} ", fu)?;
65 }
66 Ok(())
67 }
68}
69
70impl<'a> fmt::Display for Dag<'a> {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 for node in self.0.iter() {
73 writeln!(f, "ID: {} FUTURES: {}", node.0, node.1)?;
74 }
75 Ok(())
76 }
77}
78
79impl DagNode {
80 fn new() -> Self {
81 Self {
82 future: HashSet::new(),
83 past: HashSet::new(),
84 }
85 }
86 fn collapse(
87 id: &ObjectId,
88 dag: &HashMap<ObjectId, DagNode>,
89 dag_ids: &HashSet<ObjectId>,
90 already_in: &mut HashSet<ObjectId>,
91 ) -> Vec<ObjectId> {
92 let this = dag.get(id).unwrap();
93 let intersec = this
94 .past
95 .intersection(dag_ids)
96 .cloned()
97 .collect::<HashSet<ObjectId>>();
98 if intersec.len() > 1 && !intersec.is_subset(already_in) {
99 vec![]
102 } else {
103 let mut res = vec![*id];
104 already_in.insert(*id);
105 for child in this.future.iter() {
106 res.append(&mut Self::collapse(child, dag, dag_ids, already_in));
108 }
109 res
110 }
111 }
112}
113
114impl Branch {
115 fn encrypt_topic_priv_key(
121 mut plaintext: Vec<u8>,
122 topic_id: TopicId,
123 branch_id: BranchId,
124 repo_write_cap_secret: &RepoWriteCapSecret,
125 ) -> Vec<u8> {
126 let repo_write_cap_secret = serde_bare::to_vec(repo_write_cap_secret).unwrap();
127 let topic_id = serde_bare::to_vec(&topic_id).unwrap();
128 let branch_id = serde_bare::to_vec(&branch_id).unwrap();
129 let mut key_material = [repo_write_cap_secret, topic_id, branch_id].concat();
130 let mut key: [u8; 32] = blake3::derive_key(
131 "NextGraph Branch WriteCap Secret BLAKE3 key",
132 key_material.as_slice(),
133 );
134 encrypt_in_place(&mut plaintext, key, [0; 12]);
135 key.zeroize();
136 key_material.zeroize();
137 plaintext
138 }
139
140 pub fn encrypt_branch_write_cap_secret(
141 privkey: &BranchWriteCapSecret,
142 topic_id: TopicId,
143 branch_id: BranchId,
144 repo_write_cap_secret: &RepoWriteCapSecret,
145 ) -> Vec<u8> {
146 let plaintext = serde_bare::to_vec(privkey).unwrap();
147 Branch::encrypt_topic_priv_key(plaintext, topic_id, branch_id, repo_write_cap_secret)
148 }
149
150 pub fn decrypt_branch_write_cap_secret(
151 ciphertext: Vec<u8>,
152 topic_id: TopicId,
153 branch_id: BranchId,
154 repo_write_cap_secret: &RepoWriteCapSecret,
155 ) -> Result<BranchWriteCapSecret, NgError> {
156 let plaintext =
157 Branch::encrypt_topic_priv_key(ciphertext, topic_id, branch_id, repo_write_cap_secret);
158 Ok(serde_bare::from_slice(&plaintext)?)
159 }
160
161 pub fn new(
162 id: PubKey,
163 repo: ObjectRef,
164 root_branch_readcap_id: ObjectId,
165 topic_priv: PrivKey,
166 metadata: Vec<u8>,
167 ) -> Branch {
168 Branch::V0(BranchV0::new(
169 id,
170 repo,
171 root_branch_readcap_id,
172 topic_priv,
173 metadata,
174 ))
175 }
176
177 pub fn load_causal_past(
183 recursor: &mut Vec<(ObjectId, Option<ObjectId>)>,
184 store: &Store,
185 theirs: &HashSet<ObjectId>,
186 visited: &mut HashMap<ObjectId, DagNode>,
187 missing: &mut Option<&mut HashSet<ObjectId>>,
188 theirs_found: &mut Option<&mut HashSet<ObjectId>>,
189 theirs_filter: &Option<Filter>,
190 ) -> Result<(), ObjectParseError> {
191 while let Some((id, future)) = recursor.pop() {
192 match Object::load(id, None, store) {
193 Ok(cobj) => {
194 let id = cobj.id();
195
196 let mut found_in_theirs = theirs.contains(&id);
200 if !found_in_theirs {
201 found_in_theirs = if let Some(filter) = theirs_filter {
202 let hash = id.get_hash();
203 filter.contains_hash(hash)
204 } else {
205 false
206 };
207 }
208
209 if found_in_theirs {
210 if theirs_found.is_some() {
211 theirs_found.as_mut().unwrap().insert(id);
212 }
213 } else {
214 if let Some(past) = visited.get_mut(&id) {
215 if let Some(f) = future {
217 past.future.insert(f);
218 }
219 } else {
220 let mut new_node_to_insert = DagNode::new();
221 if let Some(f) = future {
222 new_node_to_insert.future.insert(f);
223 }
224 let pasts = cobj.acks_and_nacks();
225 new_node_to_insert.past.extend(pasts.iter().cloned());
226 visited.insert(id, new_node_to_insert);
227 recursor.extend(pasts.into_iter().map(|past_id| (past_id, Some(id))));
228 }
248 }
249 }
250 Err(ObjectParseError::MissingBlocks(blocks)) => {
251 if future.is_some() {
252 missing.as_mut().map(|m| m.extend(blocks));
253 }
254 }
255 Err(e) => {
256 if future.is_some() {
257 return Err(e);
258 }
259 }
260 }
261 }
262 Ok(())
263 }
264
265 pub fn sync_req(
275 target_heads: impl Iterator<Item = ObjectId>,
276 known_heads: &[ObjectId],
277 known_commits: &Option<BloomFilter>,
278 store: &Store,
279 ) -> Result<Vec<ObjectId>, ObjectParseError> {
280 let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new();
282
283 let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
285 known_heads.iter().map(|h| (h.clone(), None)).collect();
286 Self::load_causal_past(
289 &mut recursor,
290 store,
291 &HashSet::new(),
292 &mut theirs,
293 &mut None,
294 &mut None,
295 &None,
296 )?;
297
298 let mut visited = HashMap::new();
301
302 let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect();
303
304 let filter = if let Some(filter) = known_commits.as_ref() {
305 Some(
306 filter.filter(), )
308 } else {
309 None
310 };
311
312 let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
313 target_heads.map(|h| (h.clone(), None)).collect();
314 Self::load_causal_past(
318 &mut recursor,
319 store,
320 &theirs,
321 &mut visited,
322 &mut None,
323 &mut None,
324 &filter,
325 )?;
326 let mut next_generations = HashSet::new();
346 for (_, node) in visited.iter() {
347 for future in node.future.iter() {
348 next_generations.insert(future);
349 }
350 }
351 let all = HashSet::from_iter(visited.keys());
352 let first_generation = all.difference(&next_generations);
353
354 let mut already_in: HashSet<ObjectId> = HashSet::new();
355
356 let sub_dag_to_send_size = visited.len();
357 let mut result = Vec::with_capacity(sub_dag_to_send_size);
358 let dag_ids: HashSet<ObjectId> = visited.keys().cloned().collect();
359 for first in first_generation {
360 result.append(&mut DagNode::collapse(
361 first,
362 &visited,
363 &dag_ids,
364 &mut already_in,
365 ));
366 }
367 if result.len() != sub_dag_to_send_size || already_in.len() != sub_dag_to_send_size {
374 return Err(ObjectParseError::MalformedDag);
375 }
376
377 #[cfg(debug_assertions)]
378 for _res in result.iter() {
379 log_debug!("sending missing commit {}", _res);
380 }
381
382 Ok(result)
383 }
384}
385
386#[allow(unused_imports)]
387#[cfg(test)]
388mod test {
389
390 use crate::branch::*;
393
394 use crate::repo::Repo;
395
396 use crate::log::*;
397 use crate::store::Store;
398 use crate::utils::*;
399
400 #[test]
401 pub fn test_branch() {
402 fn add_obj(
403 content: ObjectContentV0,
404 header: Option<CommitHeader>,
405 store: &Store,
406 ) -> ObjectRef {
407 let max_object_size = 4000;
408 let mut obj = Object::new(ObjectContent::V0(content), header, max_object_size, store);
409 obj.save_in_test(store).unwrap();
410 obj.reference().unwrap()
411 }
412
413 fn add_commit(
414 branch: BranchId,
415 author_privkey: PrivKey,
416 author_pubkey: PubKey,
417 deps: Vec<ObjectRef>,
418 acks: Vec<ObjectRef>,
419 body_ref: ObjectRef,
420 store: &Store,
421 ) -> ObjectRef {
422 let header = CommitHeader::new_with_deps_and_acks(
423 deps.iter().map(|r| r.id).collect(),
424 acks.iter().map(|r| r.id).collect(),
425 );
426
427 let overlay = store.get_store_repo().overlay_id_for_read_purpose();
428
429 let obj_ref = ObjectRef {
430 id: ObjectId::Blake3Digest32([1; 32]),
431 key: SymKey::ChaCha20Key([2; 32]),
432 };
433 let refs = vec![obj_ref];
434 let metadata = vec![5u8; 55];
435
436 let commit = CommitV0::new(
437 &author_privkey,
438 &author_pubkey,
439 overlay,
440 branch,
441 QuorumType::NoSigning,
442 deps,
443 vec![],
444 acks,
445 vec![],
446 refs,
447 vec![],
448 metadata,
449 body_ref,
450 )
451 .unwrap();
452 add_obj(ObjectContentV0::Commit(Commit::V0(commit)), header, store)
454 }
455
456 fn add_body_branch(branch: BranchV0, store: &Store) -> ObjectRef {
457 let body: CommitBodyV0 = CommitBodyV0::Branch(Branch::V0(branch));
458 add_obj(
460 ObjectContentV0::CommitBody(CommitBody::V0(body)),
461 None,
462 store,
463 )
464 }
465
466 fn add_body_trans(header: Option<CommitHeader>, content: u8, store: &Store) -> ObjectRef {
467 let content = [content; 777].to_vec();
468 let body = CommitBodyV0::AsyncTransaction(Transaction::V0(content));
469 add_obj(
471 ObjectContentV0::CommitBody(CommitBody::V0(body)),
472 header,
473 store,
474 )
475 }
476
477 let (repo_privkey, repo_pubkey) = generate_keypair();
480 let store = Store::dummy_with_key(repo_pubkey);
481
482 let (_, branch_pubkey) = generate_keypair();
485
486 let (member_privkey, member_pubkey) = generate_keypair();
487
488 let metadata = [66u8; 64].to_vec();
489
490 let repo = Repo::new_with_member(
491 &repo_pubkey,
492 &member_pubkey,
493 &[PermissionV0::WriteAsync],
494 store,
495 );
496
497 let repo_ref = ObjectRef {
498 id: ObjectId::Blake3Digest32([1; 32]),
499 key: SymKey::ChaCha20Key([2; 32]),
500 };
501
502 let root_branch_def_id = ObjectId::Blake3Digest32([1; 32]);
503
504 let branch = BranchV0::new(
505 branch_pubkey,
506 repo_ref,
507 root_branch_def_id,
508 repo_privkey,
509 metadata,
510 );
511 fn print_branch() {
514 log_debug!("branch deps/acks:");
515 log_debug!("");
516 log_debug!(" br");
517 log_debug!(" / \\");
518 log_debug!(" t1 t2");
519 log_debug!(" \\ /");
520 log_debug!(" t4");
521 log_debug!(" |");
522 log_debug!(" t5");
523 log_debug!("");
524 }
525
526 print_branch();
527
528 let branch_body = add_body_branch(branch.clone(), &repo.store);
531
532 let trans_body = add_body_trans(None, 8, &repo.store);
533 let trans_body2 = add_body_trans(None, 9, &repo.store);
534
535 let br = add_commit(
538 branch_pubkey,
539 member_privkey.clone(),
540 member_pubkey,
541 vec![],
542 vec![],
543 branch_body.clone(),
544 &repo.store,
545 );
546 log_debug!(">> br {}", br.id);
547
548 let t1 = add_commit(
549 branch_pubkey,
550 member_privkey.clone(),
551 member_pubkey,
552 vec![],
553 vec![br.clone()],
554 trans_body.clone(),
555 &repo.store,
556 );
557 log_debug!(">> t1 {}", t1.id);
558
559 let t2 = add_commit(
560 branch_pubkey,
561 member_privkey.clone(),
562 member_pubkey,
563 vec![],
564 vec![br.clone()],
565 trans_body2.clone(),
566 &repo.store,
567 );
568 log_debug!(">> t2 {}", t2.id);
569
570 let t4 = add_commit(
571 branch_pubkey,
572 member_privkey.clone(),
573 member_pubkey,
574 vec![],
575 vec![t1.clone(), t2.clone()],
576 trans_body.clone(),
577 &repo.store,
578 );
579 log_debug!(">> t4 {}", t4.id);
580
581 let t5 = add_commit(
582 branch_pubkey,
583 member_privkey.clone(),
584 member_pubkey,
585 vec![],
586 vec![t4.clone()],
587 trans_body.clone(),
588 &repo.store,
589 );
590 log_debug!(">> t5 {}", t5.id);
591
592 let c5 = Commit::load(t5.clone(), &repo.store, true).unwrap();
593 c5.verify(&repo).unwrap();
594
595 let ids = Branch::sync_req([t5.id].into_iter(), &[t1.id], &None, &repo.store).unwrap();
608
609 assert_eq!(ids.len(), 3);
610 assert_eq!(ids, [t2.id, t4.id, t5.id]);
611 }
612}