ng_repo/
branch.rs

1// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
2// All rights reserved.
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
5// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
6// at your option. All files in the project carrying such
7// notice may not be copied, modified, or distributed except
8// according to those terms.
9
10//! Branch of a Repository
11
12use std::collections::HashMap;
13use std::collections::HashSet;
14use std::fmt;
15
16use sbbf_rs_safe::Filter;
17use zeroize::Zeroize;
18
19use crate::errors::*;
20#[allow(unused_imports)]
21use crate::log::*;
22use crate::object::*;
23use crate::store::Store;
24use crate::types::*;
25use crate::utils::encrypt_in_place;
26
27impl BranchV0 {
28    pub fn new(
29        id: PubKey,
30        repo: ObjectRef,
31        root_branch_readcap_id: ObjectId,
32        topic_priv: PrivKey,
33        metadata: Vec<u8>,
34    ) -> BranchV0 {
35        let topic_privkey: Vec<u8> = vec![];
36        //TODO: use encrypt_topic_priv_key
37        let topic = topic_priv.to_pub();
38        BranchV0 {
39            id,
40            crdt: BranchCrdt::None,
41            repo,
42            root_branch_readcap_id,
43            topic,
44            topic_privkey,
45            pulled_from: vec![],
46            metadata,
47        }
48    }
49}
50
51#[allow(dead_code)]
52#[derive(Debug)]
53pub struct DagNode {
54    pub future: HashSet<ObjectId>,
55    pub past: HashSet<ObjectId>,
56}
57
58#[allow(dead_code)]
59struct Dag<'a>(&'a HashMap<Digest, DagNode>);
60
61impl fmt::Display for DagNode {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        for fu in self.future.iter() {
64            write!(f, "{} ", fu)?;
65        }
66        Ok(())
67    }
68}
69
70impl<'a> fmt::Display for Dag<'a> {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        for node in self.0.iter() {
73            writeln!(f, "ID: {} FUTURES: {}", node.0, node.1)?;
74        }
75        Ok(())
76    }
77}
78
79impl DagNode {
80    fn new() -> Self {
81        Self {
82            future: HashSet::new(),
83            past: HashSet::new(),
84        }
85    }
86    fn collapse(
87        id: &ObjectId,
88        dag: &HashMap<ObjectId, DagNode>,
89        dag_ids: &HashSet<ObjectId>,
90        already_in: &mut HashSet<ObjectId>,
91    ) -> Vec<ObjectId> {
92        let this = dag.get(id).unwrap();
93        let intersec = this
94            .past
95            .intersection(dag_ids)
96            .cloned()
97            .collect::<HashSet<ObjectId>>();
98        if intersec.len() > 1 && !intersec.is_subset(already_in) {
99            // we postpone it
100            // log_debug!("postponed {}", id);
101            vec![]
102        } else {
103            let mut res = vec![*id];
104            already_in.insert(*id);
105            for child in this.future.iter() {
106                // log_debug!("child of {} : {}", id, child);
107                res.append(&mut Self::collapse(child, dag, dag_ids, already_in));
108            }
109            res
110        }
111    }
112}
113
114impl Branch {
115    /// topic private key (a BranchWriteCapSecret), encrypted with a key derived as follow
116    /// BLAKE3 derive_key ("NextGraph Branch WriteCap Secret BLAKE3 key",
117    ///                                        RepoWriteCapSecret, TopicId, BranchId )
118    /// so that only editors of the repo can decrypt the privkey
119    /// nonce = 0
120    fn encrypt_topic_priv_key(
121        mut plaintext: Vec<u8>,
122        topic_id: TopicId,
123        branch_id: BranchId,
124        repo_write_cap_secret: &RepoWriteCapSecret,
125    ) -> Vec<u8> {
126        let repo_write_cap_secret = serde_bare::to_vec(repo_write_cap_secret).unwrap();
127        let topic_id = serde_bare::to_vec(&topic_id).unwrap();
128        let branch_id = serde_bare::to_vec(&branch_id).unwrap();
129        let mut key_material = [repo_write_cap_secret, topic_id, branch_id].concat();
130        let mut key: [u8; 32] = blake3::derive_key(
131            "NextGraph Branch WriteCap Secret BLAKE3 key",
132            key_material.as_slice(),
133        );
134        encrypt_in_place(&mut plaintext, key, [0; 12]);
135        key.zeroize();
136        key_material.zeroize();
137        plaintext
138    }
139
140    pub fn encrypt_branch_write_cap_secret(
141        privkey: &BranchWriteCapSecret,
142        topic_id: TopicId,
143        branch_id: BranchId,
144        repo_write_cap_secret: &RepoWriteCapSecret,
145    ) -> Vec<u8> {
146        let plaintext = serde_bare::to_vec(privkey).unwrap();
147        Branch::encrypt_topic_priv_key(plaintext, topic_id, branch_id, repo_write_cap_secret)
148    }
149
150    pub fn decrypt_branch_write_cap_secret(
151        ciphertext: Vec<u8>,
152        topic_id: TopicId,
153        branch_id: BranchId,
154        repo_write_cap_secret: &RepoWriteCapSecret,
155    ) -> Result<BranchWriteCapSecret, NgError> {
156        let plaintext =
157            Branch::encrypt_topic_priv_key(ciphertext, topic_id, branch_id, repo_write_cap_secret);
158        Ok(serde_bare::from_slice(&plaintext)?)
159    }
160
161    pub fn new(
162        id: PubKey,
163        repo: ObjectRef,
164        root_branch_readcap_id: ObjectId,
165        topic_priv: PrivKey,
166        metadata: Vec<u8>,
167    ) -> Branch {
168        Branch::V0(BranchV0::new(
169            id,
170            repo,
171            root_branch_readcap_id,
172            topic_priv,
173            metadata,
174        ))
175    }
176
177    /// Load causal past of a Commit `cobj` in a `Branch` from the `Store`,
178    ///
179    /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG.
180    /// optionally collecting the missing objects/blocks that couldn't be found locally on the way,
181    /// and also optionally, collecting the commits of `theirs` found on the way
182    pub fn load_causal_past(
183        recursor: &mut Vec<(ObjectId, Option<ObjectId>)>,
184        store: &Store,
185        theirs: &HashSet<ObjectId>,
186        visited: &mut HashMap<ObjectId, DagNode>,
187        missing: &mut Option<&mut HashSet<ObjectId>>,
188        theirs_found: &mut Option<&mut HashSet<ObjectId>>,
189        theirs_filter: &Option<Filter>,
190    ) -> Result<(), ObjectParseError> {
191        while let Some((id, future)) = recursor.pop() {
192            match Object::load(id, None, store) {
193                Ok(cobj) => {
194                    let id = cobj.id();
195
196                    // check if this commit object is present in theirs or has already been visited in the current walk
197                    // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
198
199                    let mut found_in_theirs = theirs.contains(&id);
200                    if !found_in_theirs {
201                        found_in_theirs = if let Some(filter) = theirs_filter {
202                            let hash = id.get_hash();
203                            filter.contains_hash(hash)
204                        } else {
205                            false
206                        };
207                    }
208
209                    if found_in_theirs {
210                        if theirs_found.is_some() {
211                            theirs_found.as_mut().unwrap().insert(id);
212                        }
213                    } else {
214                        if let Some(past) = visited.get_mut(&id) {
215                            // we update the future
216                            if let Some(f) = future {
217                                past.future.insert(f);
218                            }
219                        } else {
220                            let mut new_node_to_insert = DagNode::new();
221                            if let Some(f) = future {
222                                new_node_to_insert.future.insert(f);
223                            }
224                            let pasts = cobj.acks_and_nacks();
225                            new_node_to_insert.past.extend(pasts.iter().cloned());
226                            visited.insert(id, new_node_to_insert);
227                            recursor.extend(pasts.into_iter().map(|past_id| (past_id, Some(id))));
228                            // for past_id in pasts {
229                            //     match Object::load(past_id, None, store) {
230                            //         Ok(o) => {
231                            //             Self::load_causal_past(
232                            //                 recursor,
233                            //                 store,
234                            //                 theirs,
235                            //                 visited,
236                            //                 missing,
237                            //                 theirs_found,
238                            //                 theirs_filter,
239                            //             )?;
240                            //         }
241                            //         Err(ObjectParseError::MissingBlocks(blocks)) => {
242                            //             missing.as_mut().map(|m| m.extend(blocks));
243                            //         }
244                            //         Err(e) => return Err(e),
245                            //     }
246                            // }
247                        }
248                    }
249                }
250                Err(ObjectParseError::MissingBlocks(blocks)) => {
251                    if future.is_some() {
252                        missing.as_mut().map(|m| m.extend(blocks));
253                    }
254                }
255                Err(e) => {
256                    if future.is_some() {
257                        return Err(e);
258                    }
259                }
260            }
261        }
262        Ok(())
263    }
264
265    /// Branch sync request from another peer
266    ///
267    /// `target_heads` represents the list of heads the requester would like to reach. this list cannot be empty.
268    ///  if the requester doesn't know what to reach, the responder should fill this list with their own current local head.
269    ///  this is not done here. it should be done before, in the handling of incoming requests.
270    /// `known_heads` represents the list of current heads at the requester replica at the moment of request.
271    ///  an empty list means the requester has an empty branch locally
272    ///
273    /// Return ObjectIds to send, ordered in respect of causal partial order
274    pub fn sync_req(
275        target_heads: impl Iterator<Item = ObjectId>,
276        known_heads: &[ObjectId],
277        known_commits: &Option<BloomFilter>,
278        store: &Store,
279    ) -> Result<Vec<ObjectId>, ObjectParseError> {
280        // their commits
281        let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new();
282
283        //
284        let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
285            known_heads.iter().map(|h| (h.clone(), None)).collect();
286        // collect causal past of known_heads
287        // we silently discard any load error on the known_heads as the responder might not know them (yet).
288        Self::load_causal_past(
289            &mut recursor,
290            store,
291            &HashSet::new(),
292            &mut theirs,
293            &mut None,
294            &mut None,
295            &None,
296        )?;
297
298        // log_debug!("their causal past \n{}", Dag(&theirs));
299
300        let mut visited = HashMap::new();
301
302        let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect();
303
304        let filter = if let Some(filter) = known_commits.as_ref() {
305            Some(
306                filter.filter(), //.map_err(|_| ObjectParseError::FilterDeserializationError)?,
307            )
308        } else {
309            None
310        };
311
312        let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
313            target_heads.map(|h| (h.clone(), None)).collect();
314        // collect all commits reachable from target_heads
315        // up to the root or until encountering a commit from theirs
316        // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
317        Self::load_causal_past(
318            &mut recursor,
319            store,
320            &theirs,
321            &mut visited,
322            &mut None,
323            &mut None,
324            &filter,
325        )?;
326        // for id in target_heads {
327        //     if let Ok(cobj) = Object::load(id, None, store) {
328        //         Self::load_causal_past(
329        //             &cobj,
330        //             store,
331        //             &theirs,
332        //             &mut visited,
333        //             &mut None,
334        //             None,
335        //             &mut None,
336        //             &filter,
337        //         )?;
338        //     }
339
340        // }
341
342        // log_debug!("what we have here \n{}", Dag(&visited));
343
344        // now ordering to respect causal partial order.
345        let mut next_generations = HashSet::new();
346        for (_, node) in visited.iter() {
347            for future in node.future.iter() {
348                next_generations.insert(future);
349            }
350        }
351        let all = HashSet::from_iter(visited.keys());
352        let first_generation = all.difference(&next_generations);
353
354        let mut already_in: HashSet<ObjectId> = HashSet::new();
355
356        let sub_dag_to_send_size = visited.len();
357        let mut result = Vec::with_capacity(sub_dag_to_send_size);
358        let dag_ids: HashSet<ObjectId> = visited.keys().cloned().collect();
359        for first in first_generation {
360            result.append(&mut DagNode::collapse(
361                first,
362                &visited,
363                &dag_ids,
364                &mut already_in,
365            ));
366        }
367        // log_debug!(
368        //     "DAG {} {} {}",
369        //     result.len(),
370        //     sub_dag_to_send_size,
371        //     already_in.len()
372        // );
373        if result.len() != sub_dag_to_send_size || already_in.len() != sub_dag_to_send_size {
374            return Err(ObjectParseError::MalformedDag);
375        }
376
377        #[cfg(debug_assertions)]
378        for _res in result.iter() {
379            log_debug!("sending missing commit {}", _res);
380        }
381
382        Ok(result)
383    }
384}
385
386#[allow(unused_imports)]
387#[cfg(test)]
388mod test {
389
390    //use use bloomfilter::Bloom;
391
392    use crate::branch::*;
393
394    use crate::repo::Repo;
395
396    use crate::log::*;
397    use crate::store::Store;
398    use crate::utils::*;
399
400    #[test]
401    pub fn test_branch() {
402        fn add_obj(
403            content: ObjectContentV0,
404            header: Option<CommitHeader>,
405            store: &Store,
406        ) -> ObjectRef {
407            let max_object_size = 4000;
408            let mut obj = Object::new(ObjectContent::V0(content), header, max_object_size, store);
409            obj.save_in_test(store).unwrap();
410            obj.reference().unwrap()
411        }
412
413        fn add_commit(
414            branch: BranchId,
415            author_privkey: PrivKey,
416            author_pubkey: PubKey,
417            deps: Vec<ObjectRef>,
418            acks: Vec<ObjectRef>,
419            body_ref: ObjectRef,
420            store: &Store,
421        ) -> ObjectRef {
422            let header = CommitHeader::new_with_deps_and_acks(
423                deps.iter().map(|r| r.id).collect(),
424                acks.iter().map(|r| r.id).collect(),
425            );
426
427            let overlay = store.get_store_repo().overlay_id_for_read_purpose();
428
429            let obj_ref = ObjectRef {
430                id: ObjectId::Blake3Digest32([1; 32]),
431                key: SymKey::ChaCha20Key([2; 32]),
432            };
433            let refs = vec![obj_ref];
434            let metadata = vec![5u8; 55];
435
436            let commit = CommitV0::new(
437                &author_privkey,
438                &author_pubkey,
439                overlay,
440                branch,
441                QuorumType::NoSigning,
442                deps,
443                vec![],
444                acks,
445                vec![],
446                refs,
447                vec![],
448                metadata,
449                body_ref,
450            )
451            .unwrap();
452            //log_debug!("commit: {:?}", commit);
453            add_obj(ObjectContentV0::Commit(Commit::V0(commit)), header, store)
454        }
455
456        fn add_body_branch(branch: BranchV0, store: &Store) -> ObjectRef {
457            let body: CommitBodyV0 = CommitBodyV0::Branch(Branch::V0(branch));
458            //log_debug!("body: {:?}", body);
459            add_obj(
460                ObjectContentV0::CommitBody(CommitBody::V0(body)),
461                None,
462                store,
463            )
464        }
465
466        fn add_body_trans(header: Option<CommitHeader>, content: u8, store: &Store) -> ObjectRef {
467            let content = [content; 777].to_vec();
468            let body = CommitBodyV0::AsyncTransaction(Transaction::V0(content));
469            //log_debug!("body: {:?}", body);
470            add_obj(
471                ObjectContentV0::CommitBody(CommitBody::V0(body)),
472                header,
473                store,
474            )
475        }
476
477        // repo
478
479        let (repo_privkey, repo_pubkey) = generate_keypair();
480        let store = Store::dummy_with_key(repo_pubkey);
481
482        // branch
483
484        let (_, branch_pubkey) = generate_keypair();
485
486        let (member_privkey, member_pubkey) = generate_keypair();
487
488        let metadata = [66u8; 64].to_vec();
489
490        let repo = Repo::new_with_member(
491            &repo_pubkey,
492            &member_pubkey,
493            &[PermissionV0::WriteAsync],
494            store,
495        );
496
497        let repo_ref = ObjectRef {
498            id: ObjectId::Blake3Digest32([1; 32]),
499            key: SymKey::ChaCha20Key([2; 32]),
500        };
501
502        let root_branch_def_id = ObjectId::Blake3Digest32([1; 32]);
503
504        let branch = BranchV0::new(
505            branch_pubkey,
506            repo_ref,
507            root_branch_def_id,
508            repo_privkey,
509            metadata,
510        );
511        //log_debug!("branch: {:?}", branch);
512
513        fn print_branch() {
514            log_debug!("branch deps/acks:");
515            log_debug!("");
516            log_debug!("     br");
517            log_debug!("    /  \\");
518            log_debug!("  t1   t2");
519            log_debug!("    \\  /");
520            log_debug!("     t4");
521            log_debug!("      |");
522            log_debug!("     t5");
523            log_debug!("");
524        }
525
526        print_branch();
527
528        // commit bodies
529
530        let branch_body = add_body_branch(branch.clone(), &repo.store);
531
532        let trans_body = add_body_trans(None, 8, &repo.store);
533        let trans_body2 = add_body_trans(None, 9, &repo.store);
534
535        // create & add commits to store
536
537        let br = add_commit(
538            branch_pubkey,
539            member_privkey.clone(),
540            member_pubkey,
541            vec![],
542            vec![],
543            branch_body.clone(),
544            &repo.store,
545        );
546        log_debug!(">> br {}", br.id);
547
548        let t1 = add_commit(
549            branch_pubkey,
550            member_privkey.clone(),
551            member_pubkey,
552            vec![],
553            vec![br.clone()],
554            trans_body.clone(),
555            &repo.store,
556        );
557        log_debug!(">> t1 {}", t1.id);
558
559        let t2 = add_commit(
560            branch_pubkey,
561            member_privkey.clone(),
562            member_pubkey,
563            vec![],
564            vec![br.clone()],
565            trans_body2.clone(),
566            &repo.store,
567        );
568        log_debug!(">> t2 {}", t2.id);
569
570        let t4 = add_commit(
571            branch_pubkey,
572            member_privkey.clone(),
573            member_pubkey,
574            vec![],
575            vec![t1.clone(), t2.clone()],
576            trans_body.clone(),
577            &repo.store,
578        );
579        log_debug!(">> t4 {}", t4.id);
580
581        let t5 = add_commit(
582            branch_pubkey,
583            member_privkey.clone(),
584            member_pubkey,
585            vec![],
586            vec![t4.clone()],
587            trans_body.clone(),
588            &repo.store,
589        );
590        log_debug!(">> t5 {}", t5.id);
591
592        let c5 = Commit::load(t5.clone(), &repo.store, true).unwrap();
593        c5.verify(&repo).unwrap();
594
595        // let mut filter = Filter::new(FilterBuilder::new(10, 0.01));
596        // for commit_ref in [br, t1, t2, t5.clone(), a6.clone()] {
597        //     match commit_ref.id {
598        //         ObjectId::Blake3Digest32(d) => filter.add(&d),
599        //     }
600        // }
601        // let cfg = filter.config();
602        // let their_commits = BloomFilter {
603        //     k: cfg.hashes,
604        //     f: filter.get_u8_array().to_vec(),
605        // };
606
607        let ids = Branch::sync_req([t5.id].into_iter(), &[t1.id], &None, &repo.store).unwrap();
608
609        assert_eq!(ids.len(), 3);
610        assert_eq!(ids, [t2.id, t4.id, t5.id]);
611    }
612}