1use crate::ipfs_core_api::client::IpfsClient;
2use crate::ipfs_log::access_controller::LogEntry;
3use crate::ipfs_log::identity::Identity;
4use crate::ipfs_log::lamport_clock::LamportClock;
5use futures::future::join_all;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::cmp::Ordering;
9use std::io::Cursor;
10use std::sync::Arc;
11use std::sync::Mutex;
12use tokio::runtime::Runtime;
13
14#[derive(Debug)]
15pub enum Error {
16 Ipfs(crate::error::GuardianError),
17 Json(serde_json::Error),
18 Io(std::io::Error),
19 Other(String),
20}
21
22impl From<crate::error::GuardianError> for Error {
23 fn from(err: crate::error::GuardianError) -> Self {
24 Error::Ipfs(err)
25 }
26}
27
28impl From<serde_json::Error> for Error {
29 fn from(err: serde_json::Error) -> Self {
30 Error::Json(err)
31 }
32}
33
34impl From<std::io::Error> for Error {
35 fn from(err: std::io::Error) -> Self {
36 Error::Io(err)
37 }
38}
39
40impl std::fmt::Display for Error {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Error::Ipfs(e) => write!(f, "IPFS error: {}", e),
44 Error::Json(e) => write!(f, "JSON error: {}", e),
45 Error::Io(e) => write!(f, "IO error: {}", e),
46 Error::Other(s) => write!(f, "Other error: {}", s),
47 }
48 }
49}
50
51impl std::error::Error for Error {}
52
53pub enum EntryOrHash<'a> {
56 Entry(&'a Entry),
57 Hash(String),
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize, Default)]
65pub struct Entry {
66 pub hash: String,
67 pub id: String,
68 pub payload: String,
69 pub next: Vec<String>,
70 pub v: u32,
71 pub clock: LamportClock,
72 #[serde(skip)]
74 pub identity: Option<Arc<Identity>>,
75}
76
77unsafe impl Send for Entry {}
79unsafe impl Sync for Entry {}
80
81impl Entry {
82 #[doc(hidden)]
84 pub fn empty() -> Entry {
85 let s = "0000";
86 Entry {
87 hash: s.to_owned(),
88 id: s.to_owned(),
89 payload: s.to_owned(),
90 next: Vec::new(),
91 v: 0,
92 clock: LamportClock::new(s),
93 identity: None,
94 }
95 }
96
97 #[doc(hidden)]
98 pub fn new(
99 identity: Identity,
100 log_id: &str,
101 data: &str,
102 next: &[EntryOrHash],
103 clock: Option<LamportClock>,
104 ) -> Entry {
105 let next = next
107 .iter()
108 .map(|n| match n {
109 EntryOrHash::Entry(e) => e.hash.to_owned(),
110 EntryOrHash::Hash(h) => h.to_owned(),
111 })
112 .collect();
113 Entry {
114 hash: data.to_owned(),
116 id: log_id.to_owned(),
117 payload: data.to_owned(),
118 next,
119 v: 1,
120 clock: clock.unwrap_or(LamportClock::new(identity.pub_key())),
121 identity: Some(Arc::new(identity)),
122 }
123 }
124
125 pub fn create(
138 ipfs: &IpfsClient,
139 identity: Identity,
140 log_id: &str,
141 data: &str,
142 nexts: &[EntryOrHash],
143 clock: Option<LamportClock>,
144 ) -> Arc<Entry> {
145 let mut e = Entry::new(identity, log_id, data, nexts, clock);
146 let hash_result = Runtime::new()
147 .unwrap()
148 .block_on(Entry::multihash(ipfs, &e))
149 .unwrap();
150 e.hash = hash_result;
151 Arc::new(e)
152 }
153
154 pub async fn multihash(ipfs: &IpfsClient, entry: &Entry) -> Result<String, Error> {
158 let e = json!({
159 "hash": "null",
160 "id": entry.id,
161 "payload": entry.payload,
162 "next": entry.next,
163 "v": entry.v,
164 "clock": entry.clock,
165 })
166 .to_string();
167
168 match ipfs.add(Cursor::new(e)).await {
169 Ok(response) => Ok(response.hash),
170 Err(e) => Err(Error::from(e)),
171 }
172 }
173
174 pub async fn from_multihash(ipfs: &IpfsClient, hash: &str) -> Result<Entry, Error> {
178 let h = hash.to_owned();
179 match ipfs.cat(hash).await {
180 Ok(mut reader) => {
181 let mut bytes = Vec::new();
182 match tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut bytes).await {
183 Ok(_) => {
184 match serde_json::from_str::<Entry>(std::str::from_utf8(&bytes).unwrap()) {
185 Ok(mut e) => {
186 e.hash = h;
187 Ok(e)
188 }
189 Err(json_err) => Err(Error::from(json_err)),
190 }
191 }
192 Err(io_err) => Err(Error::from(io_err)),
193 }
194 }
195 Err(e) => Err(Error::from(e)),
196 }
197 }
198
199 pub fn fetch_entries(ipfs: &IpfsClient, hashes: &[String]) -> Vec<Entry> {
203 let hashes = Arc::new(Mutex::new(hashes.to_vec()));
204 let mut es = Vec::new();
205 loop {
206 let mut result = Vec::new();
207 while !hashes.lock().unwrap().is_empty() {
208 let h = hashes.lock().unwrap().remove(0);
209 let hashes_clone = hashes.clone();
210 result.push(async move {
211 match Entry::from_multihash(ipfs, &h).await {
212 Ok(entry) => {
213 for n in &entry.next {
214 hashes_clone.lock().unwrap().push(n.to_owned());
215 }
216 Ok(entry)
217 }
218 Err(e) => Err(e),
219 }
220 });
221 }
222
223 let new_entries: Vec<Entry> = Runtime::new()
224 .unwrap()
225 .block_on(join_all(result))
226 .into_iter()
227 .filter_map(|r| r.ok())
228 .collect();
229
230 es.extend(new_entries);
231
232 if hashes.lock().unwrap().is_empty() {
233 break;
234 }
235 }
236 es
237 }
238
239 pub fn hash(&self) -> &str {
241 &self.hash
242 }
243
244 pub fn set_hash(&mut self, hash: &str) {
246 self.hash = hash.to_owned();
247 }
248
249 pub fn id(&self) -> &str {
251 &self.id
252 }
253
254 pub fn payload(&self) -> &str {
256 &self.payload
257 }
258
259 pub fn next(&self) -> &[String] {
265 &self.next
266 }
267
268 pub fn clock(&self) -> &LamportClock {
270 &self.clock
271 }
272
273 pub fn is_parent(e1: &Entry, e2: &Entry) -> bool {
275 e2.next().iter().any(|x| x == e1.hash())
276 }
277
278 pub fn find_children(entry: &Entry, entries: &[Arc<Entry>]) -> Vec<Arc<Entry>> {
280 let mut stack = Vec::new();
281 let mut parent = entries.iter().find(|e| Entry::is_parent(entry, e));
282 while let Some(p) = parent {
283 stack.push(p.clone());
284 let prev = p;
285 parent = entries.iter().find(|e| Entry::is_parent(prev, e));
286 }
287 stack.sort_by_key(|a| a.clock().time());
288 stack
289 }
290
291 pub fn last_write_wins(a: &Entry, b: &Entry) -> Ordering {
299 Entry::sort_step_by_step(|_, _| Ordering::Less)(a, b)
300 }
301
302 pub fn sort_by_entry_hash(a: &Entry, b: &Entry) -> Ordering {
310 Entry::sort_step_by_step(|a, b| a.hash().cmp(b.hash()))(a, b)
311 }
312
313 pub fn sort_step_by_step<F>(resolve: F) -> impl Fn(&Entry, &Entry) -> Ordering
320 where
321 F: 'static + Fn(&Entry, &Entry) -> Ordering,
322 {
323 Entry::sort_by_clocks(Entry::sort_by_clock_ids(resolve))
324 }
325
326 pub fn sort_by_clocks<F>(resolve: F) -> impl Fn(&Entry, &Entry) -> Ordering
331 where
332 F: 'static + Fn(&Entry, &Entry) -> Ordering,
333 {
334 move |a, b| {
335 let mut diff = a.clock().cmp(b.clock());
336 if diff == Ordering::Equal {
337 diff = resolve(a, b);
338 }
339 diff
340 }
341 }
342
343 pub fn sort_by_clock_ids<F>(resolve: F) -> impl Fn(&Entry, &Entry) -> Ordering
348 where
349 F: 'static + Fn(&Entry, &Entry) -> Ordering,
350 {
351 move |a, b| {
352 let mut diff = a.clock().id().cmp(b.clock().id());
353 if diff == Ordering::Equal {
354 diff = resolve(a, b);
355 }
356 diff
357 }
358 }
359
360 pub fn no_zeroes<F>(sort_fn: F) -> impl Fn(&Entry, &Entry) -> Ordering
366 where
367 F: 'static + Fn(&Entry, &Entry) -> Ordering,
368 {
369 move |a, b| {
370 let diff = sort_fn(a, b);
371 if diff == Ordering::Equal {
372 panic!(
373 "Your log's tiebreaker function {}",
374 "has returned zero and therefore cannot be"
375 );
376 }
377 diff
378 }
379 }
380}
381
382impl PartialEq for Entry {
383 fn eq(&self, other: &Self) -> bool {
384 self.hash == other.hash
385 }
386}
387
388impl Eq for Entry {}
389
390impl Ord for Entry {
391 fn cmp(&self, other: &Self) -> Ordering {
392 let diff = self.clock().cmp(other.clock());
393 if diff == Ordering::Equal {
394 self.clock().id().cmp(other.clock().id())
395 } else {
396 diff
397 }
398 }
399}
400
401impl PartialOrd for Entry {
402 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
403 Some(self.cmp(other))
404 }
405}
406
407impl LogEntry for Entry {
409 fn get_payload(&self) -> &[u8] {
410 self.payload.as_bytes()
411 }
412
413 fn get_identity(&self) -> &Identity {
414 if let Some(ref identity_arc) = self.identity {
416 return identity_arc.as_ref();
417 }
418
419 use crate::ipfs_log::identity::Signatures;
421 use std::sync::OnceLock;
422
423 static DEFAULT_IDENTITY: OnceLock<Identity> = OnceLock::new();
424 DEFAULT_IDENTITY.get_or_init(|| {
425 let signatures = Signatures::new("", "");
426 Identity::new("unknown", "unknown", signatures)
427 })
428 }
429}