crew_rs/
crew.rs

1use std::{collections::{hash_map::Entry, HashMap, HashSet}, rc::Rc};
2
3use audi::{Listener, ListenerSet};
4use endr::{SetItem, SetItemID, WriteAccess};
5use futures::{future, stream, FutureExt, Stream, StreamExt};
6use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
7use mofo::Mofo;
8use objt::objt;
9use ridl::{
10    signing::{SignatureError, Signed},
11    symm_encr::KeySecret,
12};
13use serde::Serialize;
14use serde_derive::{Deserialize, Serialize};
15use ti64::MsSinceEpoch;
16
17use crate::{
18    credential_source::CredentialSource,
19    crew_manager::WeakCrewManager,
20    timeline::{Timeline, UncheckedTimeline, UncheckedTimelineEntry},
21    AddRole, CrewChange, CrewChangeTx, CrewManager, CrewRulesV1, CrewState, MemberCredential,
22    RevealSecret, ADMIN_INVITATION_ROLE, ADMIN_ROLE, CONTENT_SECRET, PARTICIPATION_SECRET,
23    READER_ROLE, READ_INVITATION_ROLE, SET_WRITE_ACCESS_INFO_ID, WRITER_ROLE,
24    WRITE_INVITATION_ROLE,
25};
26
27#[derive(Clone, Copy, PartialEq, Eq, Hash)]
28pub struct CrewID(pub endr::ObjectID);
29
30impl_debug_as_litl!(CrewID);
31
32impl NestedTaggedData for CrewID {
33    const TAG: &'static str = "crew";
34
35    type Inner = endr::ObjectID;
36
37    fn as_inner(&self) -> &Self::Inner {
38        &self.0
39    }
40
41    fn from_inner(inner: Self::Inner) -> Self
42    where
43        Self: Sized,
44    {
45        CrewID(inner)
46    }
47}
48
49impl_nested_tagged_data_serde!(CrewID);
50
51pub struct CrewInner {
52    pub(crate) endr: endr::Node,
53    pub(crate) id: CrewID,
54    pub(crate) unchecked_timeline: UncheckedTimeline,
55    pub(crate) last_update: Option<Rc<CrewUpdate>>,
56    pub(crate) credential_source: Box<dyn CredentialSource>,
57    pub(crate) listeners: ListenerSet<Rc<CrewUpdate>>,
58    pub(crate) manager: CrewManager,
59    pub(crate) parents_unchecked: HashMap<CrewID, Crew>,
60    pub(crate) background: Mofo,
61    pub(crate) shared_secret_cache: HashMap<String, Rc<KeySecret>>,
62}
63
64#[derive(Serialize, Deserialize)]
65pub struct CrewUpdate {
66    pub(crate) timeline: Timeline,
67    pub(crate) valid_txs: HashMap<SetItemID, CrewChangeTx>,
68    pub(crate) invalid_txs: HashMap<SetItemID, (CrewChangeTx, String)>,
69}
70impl_debug_as_litl!(CrewUpdate);
71
72impl CrewUpdate {
73    pub fn current_state(&self) -> Option<&CrewState> {
74        self.timeline.last().map(|(_, state)| state)
75    }
76}
77
78objt!(
79    Crew,
80    WeakCrew,
81    CrewInner,
82    endr: endr::Node,
83    listeners: ListenerSet<Rc<CrewUpdate>>
84);
85
86impl Crew {
87    pub fn id(&self) -> CrewID {
88        self.borrow().id
89    }
90    pub fn current_state(&self) -> Option<CrewState> {
91        self.borrow()
92            .last_update
93            .as_ref()
94            .and_then(|u| u.current_state().cloned())
95    }
96}
97
98impl Crew {
99    pub async fn make_changes<I: IntoIterator<Item = CrewChange>>(
100        &self,
101        changes: I,
102    ) -> Result<SetItemID, String> {
103        let write_access = litl::from_val::<WriteAccess>(
104            self.get_entrusted_info(PARTICIPATION_SECRET, SET_WRITE_ACCESS_INFO_ID)?,
105        )
106        .map_err(|err| err.to_string())?;
107
108        self.make_changes_with_write_access(changes, write_access)
109            .await
110    }
111
112    pub(crate) async fn make_changes_with_write_access<I: IntoIterator<Item = CrewChange>>(
113        &self,
114        changes: I,
115        write_access: WriteAccess,
116    ) -> Result<SetItemID, String> {
117        let set_id = self.borrow().id.0;
118        let endr = self.endr();
119
120        let (credential, _, _) = self
121            .get_credentials_with_role(&HashSet::from_iter([
122                ADMIN_ROLE.to_string(),
123                WRITER_ROLE.to_string(),
124                ADMIN_INVITATION_ROLE.to_string(),
125                WRITE_INVITATION_ROLE.to_string(),
126                READ_INVITATION_ROLE.to_owned(),
127            ]))
128            .first()
129            .cloned()
130            .expect("Expected credential to make changes");
131
132        let tx = credential.sign_tx(changes);
133
134        let tx_item_id = endr
135            .insert_into_set_after_frontier(set_id, &write_access, tx)
136            .await
137            .map_err(|err| err.to_string())?;
138
139        // wait for tx to pop up as a valid one
140        let mut updates = self.updates(format!("Waiting for {:?}", tx_item_id));
141
142        while let Some(update) = updates.next().await {
143            if update.valid_txs.contains_key(&tx_item_id) {
144                return Ok(tx_item_id);
145            } else if let Some(reason) = update
146                .invalid_txs
147                .get(&tx_item_id)
148                .map(|(_, reason)| reason)
149            {
150                return Err(format!("Tx turned out invalid: {}", reason));
151            } else {
152                // hasn't appeared yet
153                // println!("{} hasn't appeared in update {:#?}", tx_item_id, update);
154            }
155        }
156
157        Err("Reached end of set updates before finding new tx".to_string())
158    }
159
160    pub fn get_credentials_with_role(
161        &self,
162        preferred_role_first: &HashSet<String>,
163    ) -> Vec<(MemberCredential, String, CrewID)> {
164        let credential_source = self.borrow().credential_source.clone_ref();
165        let all_credentials = credential_source.current_credentials_for(&self.id());
166        let mut collected_credentials = Vec::new();
167
168        if let Some(current_state) = self.current_state() {
169            let parents = current_state
170                .parents
171                .iter()
172                .map(|parent_id| {
173                    self.borrow()
174                        .parents_unchecked
175                        .get(parent_id)
176                        .unwrap()
177                        .clone()
178                })
179                .collect::<Vec<_>>();
180
181
182            for role in preferred_role_first {
183                for credential in &all_credentials {
184                    if current_state
185                        .roles_of(&credential.signer().pub_id())
186                        .contains(role)
187                    {
188                        collected_credentials.push((credential.clone(), role.clone(), self.id()));
189                    }
190
191                }
192
193                for parent in &parents {
194                    collected_credentials.extend(
195                        parent
196                            .get_credentials_with_role(&HashSet::from_iter([role.to_owned()])),
197                    );
198                }
199            }
200        }
201
202        collected_credentials
203    }
204
205    fn get_parent_for_ancestor(&self, ancestor_id: &CrewID) -> Option<Crew> {
206        let own_parents = &self.borrow().parents_unchecked;
207        own_parents.get(ancestor_id).cloned().or_else(|| {
208            own_parents.values().find_map(|parent| {
209                if parent.get_parent_for_ancestor(ancestor_id).is_some() {
210                    Some(parent.clone_ref())
211                } else {
212                    None
213                }
214            })
215        })
216    }
217
218    // TODO: super simplifying assumption for now: we only ever get more secrets, one secret per kind
219    pub fn get_shared_secret(&self, secret_kind: &str) -> Result<Rc<KeySecret>, String> {
220        if let Some(secret) = self.borrow_mut().shared_secret_cache.get(secret_kind) {
221            return Ok(Rc::clone(secret));
222        }
223
224        let secret = Rc::new(self.get_shared_secret_uncached(secret_kind)?);
225        self.borrow_mut().shared_secret_cache.insert(secret_kind.to_owned(), Rc::clone(&secret));
226        Ok(secret)
227    }
228
229    fn get_shared_secret_uncached(&self, secret_kind: &str) -> Result<KeySecret, String> {
230        let credentials = match secret_kind {
231            CONTENT_SECRET => self.get_credentials_with_role(&HashSet::from_iter([
232                ADMIN_ROLE.to_owned(),
233                WRITER_ROLE.to_owned(),
234                READER_ROLE.to_owned(),
235                ADMIN_INVITATION_ROLE.to_owned(),
236                WRITE_INVITATION_ROLE.to_owned(),
237                READ_INVITATION_ROLE.to_owned(),
238            ])),
239            PARTICIPATION_SECRET => self.get_credentials_with_role(&HashSet::from_iter([
240                ADMIN_ROLE.to_owned(),
241                WRITER_ROLE.to_owned(),
242                ADMIN_INVITATION_ROLE.to_owned(),
243                WRITE_INVITATION_ROLE.to_owned(),
244                READ_INVITATION_ROLE.to_owned(),
245            ])),
246            _ => self.get_credentials_with_role(&HashSet::from_iter([ADMIN_ROLE.to_owned()])),
247        };
248
249        match &self.current_state() {
250            Some(state) => {
251                let (oks, errs) = credentials
252                    .iter()
253                    .map(|(credential, credential_role, source_crew)| {
254                        if source_crew == &self.id() {
255                            let encr_secret = state
256                                .shared_secrets
257                                .get(secret_kind)
258                                .ok_or("No shared secrets for that kind")?
259                                .get(&credential.pub_id().signer)
260                                .ok_or(format!("Secret not revealed for crew credential {:?} with role {}", credential, credential_role))?;
261                            let secret = credential
262                                .decrypt_secret(encr_secret)
263                                .map_err(|err| err.to_string())?;
264                            Ok(secret)
265                        } else {
266                            let parent_crew = self
267                                .get_parent_for_ancestor(&source_crew)
268                                .expect("Expected parent crew to be loaded")
269                                .clone_ref();
270                            let parent_secret = parent_crew.get_shared_secret(secret_kind)?;
271                            let entry = state
272                                .parent_secret_map
273                                .get(&(parent_crew.id(), secret_kind.to_owned(), parent_secret.id))
274                                .ok_or_else(|| {
275                                    "Expected to have entry in parent secret map".to_owned()
276                                })?;
277                            let secret = parent_secret
278                                .decrypt(entry)
279                                .map_err(|err| err.to_string())?;
280                            Ok(secret)
281                        }
282                    })
283                    .partition::<Vec<_>, _>(Result::is_ok);
284                oks.into_iter().next().unwrap_or_else(|| {
285                    Err(format!(
286                        "Couldn't find any valid credential for secret kind {}: {:?}",
287                        secret_kind, errs
288                    ))
289                })
290            }
291            None => Err("No valid state yet".to_string()),
292        }
293    }
294
295    pub fn get_entrusted_info(
296        &self,
297        secret_kind: &str,
298        info_id: &str,
299    ) -> Result<litl::Val, String> {
300        let secret = self.get_shared_secret(secret_kind)?;
301
302        match &self.current_state() {
303            Some(state) => {
304                let encr_info = state
305                    .entrusted_info
306                    .get(secret_kind)
307                    .ok_or("No entrusted infos for that secret kind")?
308                    .get(info_id)
309                    .ok_or("No such entrusted info")?;
310                let info = secret.decrypt(encr_info).map_err(|err| err.to_string())?;
311                Ok(info)
312            }
313            None => Err("No valid state yet".to_string()),
314        }
315    }
316
317    pub async fn add_update_listener(&self, listener: Listener<Rc<CrewUpdate>>) {
318        let initial_update = { self.borrow().last_update.clone() };
319        self.listeners()
320            .add_with_initial_msg(listener, initial_update)
321            .await;
322    }
323
324    pub fn updates(
325        &self,
326        listener_prefix: String,
327    ) -> impl Stream<Item = Rc<CrewUpdate>> + Unpin + 'static {
328        let (updates_tx, updates_rx) = futures::channel::mpsc::channel(100);
329
330        let self_clone = self.clone();
331
332        stream::once(async move {
333            self_clone
334                .add_update_listener(Listener::new(
335                    &format!("{}_{:?}", listener_prefix, rand::random::<u64>()),
336                    updates_tx,
337                ))
338                .await;
339
340            updates_rx
341        })
342        .flatten()
343        .boxed_local()
344    }
345
346    pub async fn handle_set_diff(&self, diff: endr::Diff) {
347        let set_diff = diff.expect_set();
348
349        if let Some(header) = &set_diff.header {
350            let meta = header.meta.clone().expect("Expected header to have meta");
351
352            assert_eq!(meta.get("type").unwrap(), &litl::Val::str("crew"));
353
354            let initial_state = litl::from_val::<CrewState>(
355                meta.get("initialState")
356                    .cloned()
357                    .expect("Expected initial state"),
358            )
359            .expect("Expected intial state to deserialize");
360
361            self.borrow_mut()
362                .unchecked_timeline
363                .set_intial_state(initial_state, MsSinceEpoch(0)); // TODO: better timestamp for header?
364        }
365
366        for new_item in &set_diff.new_items {
367            let tx = litl::from_val::<CrewChangeTx>(new_item.attested.data.clone())
368                .unwrap_or_else(|_| panic!("Expected tx to deserialize {:?}", new_item));
369
370            let mut crew_mut = self.borrow_mut();
371            let tx_id = new_item.id();
372
373            crew_mut.unchecked_timeline.insert_own_tx(tx, tx_id);
374        }
375
376        let update = self.borrow().unchecked_timeline.resolve(CrewRulesV1);
377
378        let manager = self.borrow().manager.clone();
379        for parent_id in update
380            .current_state()
381            .iter()
382            .flat_map(|state| state.parents.iter())
383        {
384            if self.borrow().parents_unchecked.contains_key(parent_id) {
385                continue;
386            }
387            let parent_crew = manager.load_crew(*parent_id).await;
388            self.borrow_mut()
389                .parents_unchecked
390                .insert(*parent_id, parent_crew.clone());
391
392            let self_bg = self.clone();
393            let parent_id = *parent_id;
394
395            self.borrow().background.add_background_task(
396                parent_crew
397                    .updates("child_crew".to_owned())
398                    .for_each(move |update| {
399                        let self_bg = self_bg.clone();
400
401                        async move {
402                            for (made_at, state) in &update.timeline {
403                                self_bg
404                                    .borrow_mut()
405                                    .unchecked_timeline
406                                    .insert_parent_state(state.clone(), parent_id, *made_at)
407                            }
408                            let update = Rc::new(self_bg.borrow().unchecked_timeline.resolve(CrewRulesV1));
409                            self_bg.borrow_mut().last_update = Some(update.clone());
410                            self_bg.listeners().broadcast(update).await;
411                        }
412                    })
413                    .boxed_local(),
414            )
415        }
416
417        let update_rc = Rc::new(update);
418
419        self.borrow_mut().last_update = Some(update_rc.clone());
420        self.listeners().broadcast(update_rc).await;
421    }
422
423    pub async fn wait_for_state(&self, condition: impl Fn(&CrewState) -> bool) {
424        let mut updates = self.updates("waiting for update".to_owned());
425
426        while let Some(update) = updates.next().await {
427            if let Some(state) = update.current_state() {
428                if condition(state) {
429                    return;
430                }
431            }
432        }
433
434        panic!("Reached end of updates stream without matching state")
435    }
436
437    pub fn is_signed_by_member_with_role<T: Clone + Serialize>(
438        &self,
439        signed: &Signed<T>,
440        role: &str,
441    ) -> Result<(), Vec<SignatureError>> {
442        let members_with_role = self
443            .current_state()
444            .expect("Expected crew state when validating signed data")
445            .roles
446            .into_iter()
447            .filter_map(|(member, member_role)| {
448                if member_role == role {
449                    Some(member)
450                } else {
451                    None
452                }
453            });
454
455        let mut errors = vec![];
456
457        for member in members_with_role {
458            match signed.ensure_signed_by(&member.signer) {
459                Ok(()) => return Ok(()),
460                Err(e) => errors.push(e),
461            }
462        }
463
464        Err(errors)
465    }
466
467    pub async fn create_invitation(&self, invitation_role: &str) -> Result<Invitation, String> {
468        let invitation_credential = MemberCredential::new_random();
469        let content_secret = self.get_shared_secret(CONTENT_SECRET).map_err(|err| {
470            format!(
471                "Expected to have content secret when creating invitation: {}",
472                err
473            )
474        })?;
475        let participation_secret = self
476            .get_shared_secret(PARTICIPATION_SECRET)
477            .map_err(|err| {
478                format!(
479                    "Expected to have participation secret when creating invitation: {}",
480                    err
481                )
482            })?;
483
484        self.make_changes([
485            CrewChange::AddRole(AddRole {
486                to: invitation_credential.pub_id(),
487                role: invitation_role.to_string(),
488            }),
489            CrewChange::RevealSecret(RevealSecret {
490                secret_kind: CONTENT_SECRET.to_owned(),
491                to: invitation_credential.signer().pub_id(),
492                encr: invitation_credential
493                    .pub_id()
494                    .recipient
495                    .encrypt_from_anon(&content_secret),
496            }),
497            CrewChange::RevealSecret(RevealSecret {
498                secret_kind: PARTICIPATION_SECRET.to_owned(),
499                to: invitation_credential.signer().pub_id(),
500                encr: invitation_credential
501                    .pub_id()
502                    .recipient
503                    .encrypt_from_anon(&participation_secret),
504            }),
505        ])
506        .await?;
507
508        Ok(Invitation {
509            crew_id: self.id(),
510            invitation_role: invitation_role.to_string(),
511            invitation_credential,
512        })
513    }
514}
515
516#[derive(Clone, Serialize, Deserialize)]
517pub struct Invitation {
518    pub crew_id: CrewID,
519    pub invitation_role: String,
520    pub invitation_credential: MemberCredential,
521}