ng_repo/
branch.rs

1// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
2// All rights reserved.
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
5// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
6// at your option. All files in the project carrying such
7// notice may not be copied, modified, or distributed except
8// according to those terms.
9
10//! Branch of a Repository
11
12use std::collections::HashMap;
13use std::collections::HashSet;
14use std::fmt;
15
16use bloomfilter::Bloom;
17use zeroize::Zeroize;
18
19use crate::errors::*;
20#[allow(unused_imports)]
21use crate::log::*;
22use crate::object::*;
23use crate::store::Store;
24use crate::types::*;
25use crate::utils::encrypt_in_place;
26
27impl BranchV0 {
28    pub fn new(
29        id: PubKey,
30        repo: ObjectRef,
31        root_branch_readcap_id: ObjectId,
32        topic_priv: PrivKey,
33        metadata: Vec<u8>,
34    ) -> BranchV0 {
35        let topic_privkey: Vec<u8> = vec![];
36        //TODO: use encrypt_topic_priv_key
37        let topic = topic_priv.to_pub();
38        BranchV0 {
39            id,
40            content_type: BranchContentType::None,
41            repo,
42            root_branch_readcap_id,
43            topic,
44            topic_privkey,
45            pulled_from: vec![],
46            metadata,
47        }
48    }
49}
50
51#[allow(dead_code)]
52#[derive(Debug)]
53pub struct DagNode {
54    pub future: HashSet<ObjectId>,
55    pub past: HashSet<ObjectId>,
56}
57
58#[allow(dead_code)]
59struct Dag<'a>(&'a HashMap<Digest, DagNode>);
60
61impl fmt::Display for DagNode {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        for fu in self.future.iter() {
64            write!(f, "{} ", fu)?;
65        }
66        Ok(())
67    }
68}
69
70impl<'a> fmt::Display for Dag<'a> {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        for node in self.0.iter() {
73            writeln!(f, "ID: {} FUTURES: {}", node.0, node.1)?;
74        }
75        Ok(())
76    }
77}
78
79impl DagNode {
80    fn new() -> Self {
81        Self {
82            future: HashSet::new(),
83            past: HashSet::new(),
84        }
85    }
86    fn collapse(
87        id: &ObjectId,
88        dag: &HashMap<ObjectId, DagNode>,
89        dag_ids: &HashSet<ObjectId>,
90        already_in: &mut HashSet<ObjectId>,
91    ) -> Vec<ObjectId> {
92        let this = dag.get(id).unwrap();
93        let intersec = this
94            .past
95            .intersection(dag_ids)
96            .cloned()
97            .collect::<HashSet<ObjectId>>();
98        if intersec.len() > 1 && !intersec.is_subset(already_in) {
99            // we postpone it
100            // log_debug!("postponed {}", id);
101            vec![]
102        } else {
103            let mut res = vec![*id];
104            already_in.insert(*id);
105            for child in this.future.iter() {
106                log_debug!("child of {} : {}", id, child);
107                res.append(&mut Self::collapse(child, dag, dag_ids, already_in));
108            }
109            res
110        }
111    }
112}
113
114impl Branch {
115    /// topic private key (a BranchWriteCapSecret), encrypted with a key derived as follow
116    /// BLAKE3 derive_key ("NextGraph Branch WriteCap Secret BLAKE3 key",
117    ///                                        RepoWriteCapSecret, TopicId, BranchId )
118    /// so that only editors of the repo can decrypt the privkey
119    /// nonce = 0
120    fn encrypt_topic_priv_key(
121        mut plaintext: Vec<u8>,
122        topic_id: TopicId,
123        branch_id: BranchId,
124        repo_write_cap_secret: &RepoWriteCapSecret,
125    ) -> Vec<u8> {
126        let repo_write_cap_secret = serde_bare::to_vec(repo_write_cap_secret).unwrap();
127        let topic_id = serde_bare::to_vec(&topic_id).unwrap();
128        let branch_id = serde_bare::to_vec(&branch_id).unwrap();
129        let mut key_material = [repo_write_cap_secret, topic_id, branch_id].concat();
130        let mut key: [u8; 32] = blake3::derive_key(
131            "NextGraph Branch WriteCap Secret BLAKE3 key",
132            key_material.as_slice(),
133        );
134        encrypt_in_place(&mut plaintext, key, [0; 12]);
135        key.zeroize();
136        key_material.zeroize();
137        plaintext
138    }
139
140    pub fn encrypt_branch_write_cap_secret(
141        privkey: &BranchWriteCapSecret,
142        topic_id: TopicId,
143        branch_id: BranchId,
144        repo_write_cap_secret: &RepoWriteCapSecret,
145    ) -> Vec<u8> {
146        let plaintext = serde_bare::to_vec(privkey).unwrap();
147        Branch::encrypt_topic_priv_key(plaintext, topic_id, branch_id, repo_write_cap_secret)
148    }
149
150    pub fn decrypt_branch_write_cap_secret(
151        ciphertext: Vec<u8>,
152        topic_id: TopicId,
153        branch_id: BranchId,
154        repo_write_cap_secret: &RepoWriteCapSecret,
155    ) -> Result<BranchWriteCapSecret, NgError> {
156        let plaintext =
157            Branch::encrypt_topic_priv_key(ciphertext, topic_id, branch_id, repo_write_cap_secret);
158        Ok(serde_bare::from_slice(&plaintext)?)
159    }
160
161    pub fn new(
162        id: PubKey,
163        repo: ObjectRef,
164        root_branch_readcap_id: ObjectId,
165        topic_priv: PrivKey,
166        metadata: Vec<u8>,
167    ) -> Branch {
168        Branch::V0(BranchV0::new(
169            id,
170            repo,
171            root_branch_readcap_id,
172            topic_priv,
173            metadata,
174        ))
175    }
176
177    /// Load causal past of a Commit `cobj` in a `Branch` from the `Store`,
178    ///
179    /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG.
180    /// optionally collecting the missing objects/blocks that couldn't be found locally on the way,
181    /// and also optionally, collecting the commits of theirs found on the way
182    pub fn load_causal_past(
183        cobj: &Object,
184        store: &Store,
185        theirs: &HashSet<ObjectId>,
186        visited: &mut HashMap<ObjectId, DagNode>,
187        missing: &mut Option<&mut HashSet<ObjectId>>,
188        future: Option<ObjectId>,
189        theirs_found: &mut Option<&mut HashSet<ObjectId>>,
190        theirs_filter: &Option<Bloom<ObjectId>>,
191    ) -> Result<(), ObjectParseError> {
192        let id = cobj.id();
193
194        // check if this commit object is present in theirs or has already been visited in the current walk
195        // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
196
197        let found_in_filter = if let Some(filter) = theirs_filter {
198            filter.check(&id)
199        } else {
200            false
201        };
202
203        if !found_in_filter && !theirs.contains(&id) {
204            if let Some(past) = visited.get_mut(&id) {
205                // we update the future
206                if let Some(f) = future {
207                    past.future.insert(f);
208                }
209            } else {
210                let mut new_node_to_insert = DagNode::new();
211                if let Some(f) = future {
212                    new_node_to_insert.future.insert(f);
213                }
214                let pasts = cobj.acks_and_nacks();
215                new_node_to_insert.past.extend(pasts.iter().cloned());
216                visited.insert(id, new_node_to_insert);
217                for past_id in pasts {
218                    match Object::load(past_id, None, store) {
219                        Ok(o) => {
220                            Self::load_causal_past(
221                                &o,
222                                store,
223                                theirs,
224                                visited,
225                                missing,
226                                Some(id),
227                                theirs_found,
228                                theirs_filter,
229                            )?;
230                        }
231                        Err(ObjectParseError::MissingBlocks(blocks)) => {
232                            missing.as_mut().map(|m| m.extend(blocks));
233                        }
234                        Err(e) => return Err(e),
235                    }
236                }
237            }
238        } else if theirs_found.is_some() {
239            theirs_found.as_mut().unwrap().insert(id);
240        }
241        Ok(())
242    }
243
244    /// Branch sync request from another peer
245    ///
246    /// `target_heads` represents the list of heads the requester would like to reach. this list cannot be empty.
247    ///  if the requester doesn't know what to reach, the responder should fill this list with their own current local head.
248    ///  this is not done here. it should be done before, in the handling of incoming requests.
249    /// `known_heads` represents the list of current heads at the requester replica at the moment of request.
250    ///  an empty list means the requester has an empty branch locally
251    ///
252    /// Return ObjectIds to send, ordered in respect of causal partial order
253    pub fn sync_req(
254        target_heads: impl Iterator<Item = ObjectId>,
255        known_heads: &[ObjectId],
256        known_commits: &Option<BloomFilter>,
257        store: &Store,
258    ) -> Result<Vec<ObjectId>, ObjectParseError> {
259        // their commits
260        let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new();
261
262        // collect causal past of known_heads
263        for id in known_heads {
264            if let Ok(cobj) = Object::load(*id, None, store) {
265                Self::load_causal_past(
266                    &cobj,
267                    store,
268                    &HashSet::new(),
269                    &mut theirs,
270                    &mut None,
271                    None,
272                    &mut None,
273                    &None,
274                )?;
275            }
276            // we silently discard any load error on the known_heads as the responder might not know them (yet).
277        }
278
279        //log_debug!("their causal past \n{}", Dag(&theirs));
280
281        let mut visited = HashMap::new();
282
283        let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect();
284
285        let filter = if let Some(filter) = known_commits.as_ref() {
286            Some(
287                serde_bare::from_slice(filter.filter())
288                    .map_err(|_| ObjectParseError::FilterDeserializationError)?,
289            )
290        } else {
291            None
292        };
293
294        // collect all commits reachable from target_heads
295        // up to the root or until encountering a commit from theirs
296        for id in target_heads {
297            if let Ok(cobj) = Object::load(id, None, store) {
298                Self::load_causal_past(
299                    &cobj,
300                    store,
301                    &theirs,
302                    &mut visited,
303                    &mut None,
304                    None,
305                    &mut None,
306                    &filter,
307                )?;
308            }
309            // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
310        }
311
312        //log_debug!("what we have here \n{}", Dag(&visited));
313
314        // now ordering to respect causal partial order.
315        let mut next_generations = HashSet::new();
316        for (_, node) in visited.iter() {
317            for future in node.future.iter() {
318                next_generations.insert(future);
319            }
320        }
321        let all = HashSet::from_iter(visited.keys());
322        let first_generation = all.difference(&next_generations);
323
324        let mut already_in: HashSet<ObjectId> = HashSet::new();
325
326        let sub_dag_to_send_size = visited.len();
327        let mut result = Vec::with_capacity(sub_dag_to_send_size);
328        let dag_ids: HashSet<ObjectId> = visited.keys().cloned().collect();
329        for first in first_generation {
330            result.append(&mut DagNode::collapse(
331                first,
332                &visited,
333                &dag_ids,
334                &mut already_in,
335            ));
336        }
337        // log_debug!(
338        //     "DAG {} {} {}",
339        //     result.len(),
340        //     sub_dag_to_send_size,
341        //     already_in.len()
342        // );
343        if result.len() != sub_dag_to_send_size || already_in.len() != sub_dag_to_send_size {
344            return Err(ObjectParseError::MalformedDag);
345        }
346
347        #[cfg(debug_assertions)]
348        for _res in result.iter() {
349            log_debug!("sending missing commit {}", _res);
350        }
351
352        Ok(result)
353    }
354}
355
356#[allow(unused_imports)]
357#[cfg(test)]
358mod test {
359
360    //use use bloomfilter::Bloom;
361
362    use crate::branch::*;
363
364    use crate::repo::Repo;
365
366    use crate::log::*;
367    use crate::store::Store;
368    use crate::utils::*;
369
370    #[test]
371    pub fn test_branch() {
372        fn add_obj(
373            content: ObjectContentV0,
374            header: Option<CommitHeader>,
375            store: &Store,
376        ) -> ObjectRef {
377            let max_object_size = 4000;
378            let mut obj = Object::new(ObjectContent::V0(content), header, max_object_size, store);
379            obj.save_in_test(store).unwrap();
380            obj.reference().unwrap()
381        }
382
383        fn add_commit(
384            branch: BranchId,
385            author_privkey: PrivKey,
386            author_pubkey: PubKey,
387            deps: Vec<ObjectRef>,
388            acks: Vec<ObjectRef>,
389            body_ref: ObjectRef,
390            store: &Store,
391        ) -> ObjectRef {
392            let header = CommitHeader::new_with_deps_and_acks(
393                deps.iter().map(|r| r.id).collect(),
394                acks.iter().map(|r| r.id).collect(),
395            );
396
397            let overlay = store.get_store_repo().overlay_id_for_read_purpose();
398
399            let obj_ref = ObjectRef {
400                id: ObjectId::Blake3Digest32([1; 32]),
401                key: SymKey::ChaCha20Key([2; 32]),
402            };
403            let refs = vec![obj_ref];
404            let metadata = vec![5u8; 55];
405
406            let commit = CommitV0::new(
407                &author_privkey,
408                &author_pubkey,
409                overlay,
410                branch,
411                QuorumType::NoSigning,
412                deps,
413                vec![],
414                acks,
415                vec![],
416                refs,
417                vec![],
418                metadata,
419                body_ref,
420            )
421            .unwrap();
422            //log_debug!("commit: {:?}", commit);
423            add_obj(ObjectContentV0::Commit(Commit::V0(commit)), header, store)
424        }
425
426        fn add_body_branch(branch: BranchV0, store: &Store) -> ObjectRef {
427            let body: CommitBodyV0 = CommitBodyV0::Branch(Branch::V0(branch));
428            //log_debug!("body: {:?}", body);
429            add_obj(
430                ObjectContentV0::CommitBody(CommitBody::V0(body)),
431                None,
432                store,
433            )
434        }
435
436        fn add_body_trans(header: Option<CommitHeader>, content: u8, store: &Store) -> ObjectRef {
437            let content = [content; 777].to_vec();
438            let body = CommitBodyV0::AsyncTransaction(Transaction::V0(content));
439            //log_debug!("body: {:?}", body);
440            add_obj(
441                ObjectContentV0::CommitBody(CommitBody::V0(body)),
442                header,
443                store,
444            )
445        }
446
447        // repo
448
449        let (repo_privkey, repo_pubkey) = generate_keypair();
450        let store = Store::dummy_with_key(repo_pubkey);
451
452        // branch
453
454        let (_, branch_pubkey) = generate_keypair();
455
456        let (member_privkey, member_pubkey) = generate_keypair();
457
458        let metadata = [66u8; 64].to_vec();
459
460        let repo = Repo::new_with_member(
461            &repo_pubkey,
462            &member_pubkey,
463            &[PermissionV0::WriteAsync],
464            store,
465        );
466
467        let repo_ref = ObjectRef {
468            id: ObjectId::Blake3Digest32([1; 32]),
469            key: SymKey::ChaCha20Key([2; 32]),
470        };
471
472        let root_branch_def_id = ObjectId::Blake3Digest32([1; 32]);
473
474        let branch = BranchV0::new(
475            branch_pubkey,
476            repo_ref,
477            root_branch_def_id,
478            repo_privkey,
479            metadata,
480        );
481        //log_debug!("branch: {:?}", branch);
482
483        fn print_branch() {
484            log_debug!("branch deps/acks:");
485            log_debug!("");
486            log_debug!("     br");
487            log_debug!("    /  \\");
488            log_debug!("  t1   t2");
489            log_debug!("    \\  /");
490            log_debug!("     t4");
491            log_debug!("      |");
492            log_debug!("     t5");
493            log_debug!("");
494        }
495
496        print_branch();
497
498        // commit bodies
499
500        let branch_body = add_body_branch(branch.clone(), &repo.store);
501
502        let trans_body = add_body_trans(None, 8, &repo.store);
503        let trans_body2 = add_body_trans(None, 9, &repo.store);
504
505        // create & add commits to store
506
507        let br = add_commit(
508            branch_pubkey,
509            member_privkey.clone(),
510            member_pubkey,
511            vec![],
512            vec![],
513            branch_body.clone(),
514            &repo.store,
515        );
516        log_debug!(">> br {}", br.id);
517
518        let t1 = add_commit(
519            branch_pubkey,
520            member_privkey.clone(),
521            member_pubkey,
522            vec![],
523            vec![br.clone()],
524            trans_body.clone(),
525            &repo.store,
526        );
527        log_debug!(">> t1 {}", t1.id);
528
529        let t2 = add_commit(
530            branch_pubkey,
531            member_privkey.clone(),
532            member_pubkey,
533            vec![],
534            vec![br.clone()],
535            trans_body2.clone(),
536            &repo.store,
537        );
538        log_debug!(">> t2 {}", t2.id);
539
540        let t4 = add_commit(
541            branch_pubkey,
542            member_privkey.clone(),
543            member_pubkey,
544            vec![],
545            vec![t1.clone(), t2.clone()],
546            trans_body.clone(),
547            &repo.store,
548        );
549        log_debug!(">> t4 {}", t4.id);
550
551        let t5 = add_commit(
552            branch_pubkey,
553            member_privkey.clone(),
554            member_pubkey,
555            vec![],
556            vec![t4.clone()],
557            trans_body.clone(),
558            &repo.store,
559        );
560        log_debug!(">> t5 {}", t5.id);
561
562        let c5 = Commit::load(t5.clone(), &repo.store, true).unwrap();
563        c5.verify(&repo).unwrap();
564
565        // let mut filter = Filter::new(FilterBuilder::new(10, 0.01));
566        // for commit_ref in [br, t1, t2, t5.clone(), a6.clone()] {
567        //     match commit_ref.id {
568        //         ObjectId::Blake3Digest32(d) => filter.add(&d),
569        //     }
570        // }
571        // let cfg = filter.config();
572        // let their_commits = BloomFilter {
573        //     k: cfg.hashes,
574        //     f: filter.get_u8_array().to_vec(),
575        // };
576
577        let ids = Branch::sync_req([t5.id].into_iter(), &[t1.id], &None, &repo.store).unwrap();
578
579        assert_eq!(ids.len(), 3);
580        assert_eq!(ids, [t2.id, t4.id, t5.id]);
581    }
582}