guardian_db/ipfs_log/
entry.rs

1use crate::ipfs_core_api::client::IpfsClient;
2use crate::ipfs_log::access_controller::LogEntry;
3use crate::ipfs_log::identity::Identity;
4use crate::ipfs_log::lamport_clock::LamportClock;
5use futures::future::join_all;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::cmp::Ordering;
9use std::io::Cursor;
10use std::sync::Arc;
11use std::sync::Mutex;
12use tokio::runtime::Runtime;
13
14#[derive(Debug)]
15pub enum Error {
16    Ipfs(crate::error::GuardianError),
17    Json(serde_json::Error),
18    Io(std::io::Error),
19    Other(String),
20}
21
22impl From<crate::error::GuardianError> for Error {
23    fn from(err: crate::error::GuardianError) -> Self {
24        Error::Ipfs(err)
25    }
26}
27
28impl From<serde_json::Error> for Error {
29    fn from(err: serde_json::Error) -> Self {
30        Error::Json(err)
31    }
32}
33
34impl From<std::io::Error> for Error {
35    fn from(err: std::io::Error) -> Self {
36        Error::Io(err)
37    }
38}
39
40impl std::fmt::Display for Error {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Error::Ipfs(e) => write!(f, "IPFS error: {}", e),
44            Error::Json(e) => write!(f, "JSON error: {}", e),
45            Error::Io(e) => write!(f, "IO error: {}", e),
46            Error::Other(s) => write!(f, "Other error: {}", s),
47        }
48    }
49}
50
51impl std::error::Error for Error {}
52
53/// A wrapper containing either a reference to an entry
54/// or a hash as a string.
55pub enum EntryOrHash<'a> {
56    Entry(&'a Entry),
57    Hash(String),
58}
59
60/// An entry containing data payload, a hash to locate it in [`IPFS`],
61/// and pointers to its parents.
62///
63/// [`IPFS`]: https://ipfs.io
64#[derive(Clone, Debug, Serialize, Deserialize, Default)]
65pub struct Entry {
66    pub hash: String,
67    pub id: String,
68    pub payload: String,
69    pub next: Vec<String>,
70    pub v: u32,
71    pub clock: LamportClock,
72    // Campo opcional para armazenar a identidade associada à entrada
73    #[serde(skip)]
74    pub identity: Option<Arc<Identity>>,
75}
76
77// Explicit Send + Sync implementations for Entry
78unsafe impl Send for Entry {}
79unsafe impl Sync for Entry {}
80
81impl Entry {
82    //very ad hoc
83    #[doc(hidden)]
84    pub fn empty() -> Entry {
85        let s = "0000";
86        Entry {
87            hash: s.to_owned(),
88            id: s.to_owned(),
89            payload: s.to_owned(),
90            next: Vec::new(),
91            v: 0,
92            clock: LamportClock::new(s),
93            identity: None,
94        }
95    }
96
97    #[doc(hidden)]
98    pub fn new(
99        identity: Identity,
100        log_id: &str,
101        data: &str,
102        next: &[EntryOrHash],
103        clock: Option<LamportClock>,
104    ) -> Entry {
105        //None filtering required?
106        let next = next
107            .iter()
108            .map(|n| match n {
109                EntryOrHash::Entry(e) => e.hash.to_owned(),
110                EntryOrHash::Hash(h) => h.to_owned(),
111            })
112            .collect();
113        Entry {
114            //very much ad hoc
115            hash: data.to_owned(),
116            id: log_id.to_owned(),
117            payload: data.to_owned(),
118            next,
119            v: 1,
120            clock: clock.unwrap_or(LamportClock::new(identity.pub_key())),
121            identity: Some(Arc::new(identity)),
122        }
123    }
124
125    /// Locally creates an entry owned by `identity` .
126    ///
127    ///  The created entry is part of the [log] with the id `log_id`,
128    /// holds payload of `data` and can be assigned to point to
129    /// at most two parents with their hashes in `nexts`. Providing a
130    /// [Lamport clock] via `clock` is optional.
131    ///
132    /// Returns a [reference-counting pointer] to the created entry.
133    ///
134    /// [log]: ../log/struct.Log.html
135    /// [Lamport clock]: ../lamport_clock/struct.LamportClock.html
136    /// [reference-counting pointer]: https://doc.rust-lang.org/std/rc/struct.Rc.html
137    pub fn create(
138        ipfs: &IpfsClient,
139        identity: Identity,
140        log_id: &str,
141        data: &str,
142        nexts: &[EntryOrHash],
143        clock: Option<LamportClock>,
144    ) -> Arc<Entry> {
145        let mut e = Entry::new(identity, log_id, data, nexts, clock);
146        let hash_result = Runtime::new()
147            .unwrap()
148            .block_on(Entry::multihash(ipfs, &e))
149            .unwrap();
150        e.hash = hash_result;
151        Arc::new(e)
152    }
153
154    /// Stores `entry` in the IPFS client `ipfs` and returns a future containing its multihash.
155    ///
156    /// **N.B.** *At the moment stores the entry as JSON, not CBOR DAG.*
157    pub async fn multihash(ipfs: &IpfsClient, entry: &Entry) -> Result<String, Error> {
158        let e = json!({
159            "hash": "null",
160            "id": entry.id,
161            "payload": entry.payload,
162            "next": entry.next,
163            "v": entry.v,
164            "clock": entry.clock,
165        })
166        .to_string();
167
168        match ipfs.add(Cursor::new(e)).await {
169            Ok(response) => Ok(response.hash),
170            Err(e) => Err(Error::from(e)),
171        }
172    }
173
174    /// Returns the future containing the entry stored in the IPFS client `ipfs` with the multihash `hash`.
175    ///
176    /// **N.B.** *At the moment converts the entry from JSON, not CBOR DAG.*
177    pub async fn from_multihash(ipfs: &IpfsClient, hash: &str) -> Result<Entry, Error> {
178        let h = hash.to_owned();
179        match ipfs.cat(hash).await {
180            Ok(mut reader) => {
181                let mut bytes = Vec::new();
182                match tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut bytes).await {
183                    Ok(_) => {
184                        match serde_json::from_str::<Entry>(std::str::from_utf8(&bytes).unwrap()) {
185                            Ok(mut e) => {
186                                e.hash = h;
187                                Ok(e)
188                            }
189                            Err(json_err) => Err(Error::from(json_err)),
190                        }
191                    }
192                    Err(io_err) => Err(Error::from(io_err)),
193                }
194            }
195            Err(e) => Err(Error::from(e)),
196        }
197    }
198
199    /// Fetches all the entries with the hashes in `hashes` and all their parents from the IPFS client `ipfs`.
200    ///
201    /// Returns a vector of entries.
202    pub fn fetch_entries(ipfs: &IpfsClient, hashes: &[String]) -> Vec<Entry> {
203        let hashes = Arc::new(Mutex::new(hashes.to_vec()));
204        let mut es = Vec::new();
205        loop {
206            let mut result = Vec::new();
207            while !hashes.lock().unwrap().is_empty() {
208                let h = hashes.lock().unwrap().remove(0);
209                let hashes_clone = hashes.clone();
210                result.push(async move {
211                    match Entry::from_multihash(ipfs, &h).await {
212                        Ok(entry) => {
213                            for n in &entry.next {
214                                hashes_clone.lock().unwrap().push(n.to_owned());
215                            }
216                            Ok(entry)
217                        }
218                        Err(e) => Err(e),
219                    }
220                });
221            }
222
223            let new_entries: Vec<Entry> = Runtime::new()
224                .unwrap()
225                .block_on(join_all(result))
226                .into_iter()
227                .filter_map(|r| r.ok())
228                .collect();
229
230            es.extend(new_entries);
231
232            if hashes.lock().unwrap().is_empty() {
233                break;
234            }
235        }
236        es
237    }
238
239    /// Returns the hash of the entry.
240    pub fn hash(&self) -> &str {
241        &self.hash
242    }
243
244    /// Sets the hash of the entry.
245    pub fn set_hash(&mut self, hash: &str) {
246        self.hash = hash.to_owned();
247    }
248
249    /// Returns the identifier of the entry that is the same as of the containing log.
250    pub fn id(&self) -> &str {
251        &self.id
252    }
253
254    /// Returns the data payload of the entry.
255    pub fn payload(&self) -> &str {
256        &self.payload
257    }
258
259    /// Returns the hashes of the parents.
260    ///
261    /// The length of the returned slice is either:
262    /// * 0 &mdash; no parents
263    /// * 2 &mdash; two identical strings for one parent, two distinct strings for two different parents
264    pub fn next(&self) -> &[String] {
265        &self.next
266    }
267
268    /// Returns the Lamport clock of the entry.
269    pub fn clock(&self) -> &LamportClock {
270        &self.clock
271    }
272
273    /// Returns `true` if `e1` is the parent of `e2`, otherwise returns `false`.
274    pub fn is_parent(e1: &Entry, e2: &Entry) -> bool {
275        e2.next().iter().any(|x| x == e1.hash())
276    }
277
278    /// Returns a vector of pointers to all direct and indirect children of `entry` in `entries`.
279    pub fn find_children(entry: &Entry, entries: &[Arc<Entry>]) -> Vec<Arc<Entry>> {
280        let mut stack = Vec::new();
281        let mut parent = entries.iter().find(|e| Entry::is_parent(entry, e));
282        while let Some(p) = parent {
283            stack.push(p.clone());
284            let prev = p;
285            parent = entries.iter().find(|e| Entry::is_parent(prev, e));
286        }
287        stack.sort_by_key(|a| a.clock().time());
288        stack
289    }
290
291    /// A sorting function to pick the more recently written entry.
292    ///
293    /// Uses [`sort_step_by_step`], resolving unsorted cases in the manner defined in it.
294    ///
295    /// Returns an ordering.
296    ///
297    /// [`sort_step_by_step`]: #method.sort_step_by_step
298    pub fn last_write_wins(a: &Entry, b: &Entry) -> Ordering {
299        Entry::sort_step_by_step(|_, _| Ordering::Less)(a, b)
300    }
301
302    /// A sorting function to pick the entry with the greater hash.
303    ///
304    /// Uses [`sort_step_by_step`], resolving unsorted cases in the manner defined in it.
305    ///
306    /// Returns an ordering.
307    ///
308    /// [`sort_step_by_step`]: #method.sort_step_by_step
309    pub fn sort_by_entry_hash(a: &Entry, b: &Entry) -> Ordering {
310        Entry::sort_step_by_step(|a, b| a.hash().cmp(b.hash()))(a, b)
311    }
312
313    /// A sorting helper function to
314    /// 1. first try to sort the two entries using `resolve`,
315    /// 2. if still unsorted (equal), try to sort based on the Lamport clock identifiers of the respective entries,
316    /// 3. sort by the Lamport clocks of the respective entries.
317    ///
318    /// Returns a closure that can be used as a sorting function.
319    pub fn sort_step_by_step<F>(resolve: F) -> impl Fn(&Entry, &Entry) -> Ordering
320    where
321        F: 'static + Fn(&Entry, &Entry) -> Ordering,
322    {
323        Entry::sort_by_clocks(Entry::sort_by_clock_ids(resolve))
324    }
325
326    /// A sorting helper function to sort by the Lamport clocks of the respective entries.
327    /// In the case the Lamport clocks are equal, tries to sort using `resolve`.
328    ///
329    /// Returns a closure that can be used as a sorting function.
330    pub fn sort_by_clocks<F>(resolve: F) -> impl Fn(&Entry, &Entry) -> Ordering
331    where
332        F: 'static + Fn(&Entry, &Entry) -> Ordering,
333    {
334        move |a, b| {
335            let mut diff = a.clock().cmp(b.clock());
336            if diff == Ordering::Equal {
337                diff = resolve(a, b);
338            }
339            diff
340        }
341    }
342
343    /// A sorting helper function to sort by the Lamport clock identifiers of the respective entries.
344    /// In the case the Lamport clocks identifiers are equal, tries to sort using `resolve`.
345    ///
346    /// Returns a closure that can be used as a sorting function.
347    pub fn sort_by_clock_ids<F>(resolve: F) -> impl Fn(&Entry, &Entry) -> Ordering
348    where
349        F: 'static + Fn(&Entry, &Entry) -> Ordering,
350    {
351        move |a, b| {
352            let mut diff = a.clock().id().cmp(b.clock().id());
353            if diff == Ordering::Equal {
354                diff = resolve(a, b);
355            }
356            diff
357        }
358    }
359
360    /// A sorting helper function that forbids the sorting function `sort_fn` from
361    /// producing unsorted (equal) cases.
362    ///
363    /// Returns a closure that behaves in the same way as `sort_fn`
364    /// but panics if the two entries given as input are equal.
365    pub fn no_zeroes<F>(sort_fn: F) -> impl Fn(&Entry, &Entry) -> Ordering
366    where
367        F: 'static + Fn(&Entry, &Entry) -> Ordering,
368    {
369        move |a, b| {
370            let diff = sort_fn(a, b);
371            if diff == Ordering::Equal {
372                panic!(
373                    "Your log's tiebreaker function {}",
374                    "has returned zero and therefore cannot be"
375                );
376            }
377            diff
378        }
379    }
380}
381
382impl PartialEq for Entry {
383    fn eq(&self, other: &Self) -> bool {
384        self.hash == other.hash
385    }
386}
387
388impl Eq for Entry {}
389
390impl Ord for Entry {
391    fn cmp(&self, other: &Self) -> Ordering {
392        let diff = self.clock().cmp(other.clock());
393        if diff == Ordering::Equal {
394            self.clock().id().cmp(other.clock().id())
395        } else {
396            diff
397        }
398    }
399}
400
401impl PartialOrd for Entry {
402    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
403        Some(self.cmp(other))
404    }
405}
406
407// Implementação do trait LogEntry para Entry
408impl LogEntry for Entry {
409    fn get_payload(&self) -> &[u8] {
410        self.payload.as_bytes()
411    }
412
413    fn get_identity(&self) -> &Identity {
414        // Se temos uma identidade armazenada, a retornamos
415        if let Some(ref identity_arc) = self.identity {
416            return identity_arc.as_ref();
417        }
418
419        // Caso contrário, retornamos uma identidade padrão baseada no clock ID
420        use crate::ipfs_log::identity::Signatures;
421        use std::sync::OnceLock;
422
423        static DEFAULT_IDENTITY: OnceLock<Identity> = OnceLock::new();
424        DEFAULT_IDENTITY.get_or_init(|| {
425            let signatures = Signatures::new("", "");
426            Identity::new("unknown", "unknown", signatures)
427        })
428    }
429}