1use std::{collections::{hash_map::Entry, HashMap, HashSet}, rc::Rc};
2
3use audi::{Listener, ListenerSet};
4use endr::{SetItem, SetItemID, WriteAccess};
5use futures::{future, stream, FutureExt, Stream, StreamExt};
6use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
7use mofo::Mofo;
8use objt::objt;
9use ridl::{
10 signing::{SignatureError, Signed},
11 symm_encr::KeySecret,
12};
13use serde::Serialize;
14use serde_derive::{Deserialize, Serialize};
15use ti64::MsSinceEpoch;
16
17use crate::{
18 credential_source::CredentialSource,
19 crew_manager::WeakCrewManager,
20 timeline::{Timeline, UncheckedTimeline, UncheckedTimelineEntry},
21 AddRole, CrewChange, CrewChangeTx, CrewManager, CrewRulesV1, CrewState, MemberCredential,
22 RevealSecret, ADMIN_INVITATION_ROLE, ADMIN_ROLE, CONTENT_SECRET, PARTICIPATION_SECRET,
23 READER_ROLE, READ_INVITATION_ROLE, SET_WRITE_ACCESS_INFO_ID, WRITER_ROLE,
24 WRITE_INVITATION_ROLE,
25};
26
27#[derive(Clone, Copy, PartialEq, Eq, Hash)]
28pub struct CrewID(pub endr::ObjectID);
29
30impl_debug_as_litl!(CrewID);
31
32impl NestedTaggedData for CrewID {
33 const TAG: &'static str = "crew";
34
35 type Inner = endr::ObjectID;
36
37 fn as_inner(&self) -> &Self::Inner {
38 &self.0
39 }
40
41 fn from_inner(inner: Self::Inner) -> Self
42 where
43 Self: Sized,
44 {
45 CrewID(inner)
46 }
47}
48
49impl_nested_tagged_data_serde!(CrewID);
50
51pub struct CrewInner {
52 pub(crate) endr: endr::Node,
53 pub(crate) id: CrewID,
54 pub(crate) unchecked_timeline: UncheckedTimeline,
55 pub(crate) last_update: Option<Rc<CrewUpdate>>,
56 pub(crate) credential_source: Box<dyn CredentialSource>,
57 pub(crate) listeners: ListenerSet<Rc<CrewUpdate>>,
58 pub(crate) manager: CrewManager,
59 pub(crate) parents_unchecked: HashMap<CrewID, Crew>,
60 pub(crate) background: Mofo,
61 pub(crate) shared_secret_cache: HashMap<String, Rc<KeySecret>>,
62}
63
64#[derive(Serialize, Deserialize)]
65pub struct CrewUpdate {
66 pub(crate) timeline: Timeline,
67 pub(crate) valid_txs: HashMap<SetItemID, CrewChangeTx>,
68 pub(crate) invalid_txs: HashMap<SetItemID, (CrewChangeTx, String)>,
69}
70impl_debug_as_litl!(CrewUpdate);
71
72impl CrewUpdate {
73 pub fn current_state(&self) -> Option<&CrewState> {
74 self.timeline.last().map(|(_, state)| state)
75 }
76}
77
78objt!(
79 Crew,
80 WeakCrew,
81 CrewInner,
82 endr: endr::Node,
83 listeners: ListenerSet<Rc<CrewUpdate>>
84);
85
86impl Crew {
87 pub fn id(&self) -> CrewID {
88 self.borrow().id
89 }
90 pub fn current_state(&self) -> Option<CrewState> {
91 self.borrow()
92 .last_update
93 .as_ref()
94 .and_then(|u| u.current_state().cloned())
95 }
96}
97
98impl Crew {
99 pub async fn make_changes<I: IntoIterator<Item = CrewChange>>(
100 &self,
101 changes: I,
102 ) -> Result<SetItemID, String> {
103 let write_access = litl::from_val::<WriteAccess>(
104 self.get_entrusted_info(PARTICIPATION_SECRET, SET_WRITE_ACCESS_INFO_ID)?,
105 )
106 .map_err(|err| err.to_string())?;
107
108 self.make_changes_with_write_access(changes, write_access)
109 .await
110 }
111
112 pub(crate) async fn make_changes_with_write_access<I: IntoIterator<Item = CrewChange>>(
113 &self,
114 changes: I,
115 write_access: WriteAccess,
116 ) -> Result<SetItemID, String> {
117 let set_id = self.borrow().id.0;
118 let endr = self.endr();
119
120 let (credential, _, _) = self
121 .get_credentials_with_role(&HashSet::from_iter([
122 ADMIN_ROLE.to_string(),
123 WRITER_ROLE.to_string(),
124 ADMIN_INVITATION_ROLE.to_string(),
125 WRITE_INVITATION_ROLE.to_string(),
126 READ_INVITATION_ROLE.to_owned(),
127 ]))
128 .first()
129 .cloned()
130 .expect("Expected credential to make changes");
131
132 let tx = credential.sign_tx(changes);
133
134 let tx_item_id = endr
135 .insert_into_set_after_frontier(set_id, &write_access, tx)
136 .await
137 .map_err(|err| err.to_string())?;
138
139 let mut updates = self.updates(format!("Waiting for {:?}", tx_item_id));
141
142 while let Some(update) = updates.next().await {
143 if update.valid_txs.contains_key(&tx_item_id) {
144 return Ok(tx_item_id);
145 } else if let Some(reason) = update
146 .invalid_txs
147 .get(&tx_item_id)
148 .map(|(_, reason)| reason)
149 {
150 return Err(format!("Tx turned out invalid: {}", reason));
151 } else {
152 }
155 }
156
157 Err("Reached end of set updates before finding new tx".to_string())
158 }
159
160 pub fn get_credentials_with_role(
161 &self,
162 preferred_role_first: &HashSet<String>,
163 ) -> Vec<(MemberCredential, String, CrewID)> {
164 let credential_source = self.borrow().credential_source.clone_ref();
165 let all_credentials = credential_source.current_credentials_for(&self.id());
166 let mut collected_credentials = Vec::new();
167
168 if let Some(current_state) = self.current_state() {
169 let parents = current_state
170 .parents
171 .iter()
172 .map(|parent_id| {
173 self.borrow()
174 .parents_unchecked
175 .get(parent_id)
176 .unwrap()
177 .clone()
178 })
179 .collect::<Vec<_>>();
180
181
182 for role in preferred_role_first {
183 for credential in &all_credentials {
184 if current_state
185 .roles_of(&credential.signer().pub_id())
186 .contains(role)
187 {
188 collected_credentials.push((credential.clone(), role.clone(), self.id()));
189 }
190
191 }
192
193 for parent in &parents {
194 collected_credentials.extend(
195 parent
196 .get_credentials_with_role(&HashSet::from_iter([role.to_owned()])),
197 );
198 }
199 }
200 }
201
202 collected_credentials
203 }
204
205 fn get_parent_for_ancestor(&self, ancestor_id: &CrewID) -> Option<Crew> {
206 let own_parents = &self.borrow().parents_unchecked;
207 own_parents.get(ancestor_id).cloned().or_else(|| {
208 own_parents.values().find_map(|parent| {
209 if parent.get_parent_for_ancestor(ancestor_id).is_some() {
210 Some(parent.clone_ref())
211 } else {
212 None
213 }
214 })
215 })
216 }
217
218 pub fn get_shared_secret(&self, secret_kind: &str) -> Result<Rc<KeySecret>, String> {
220 if let Some(secret) = self.borrow_mut().shared_secret_cache.get(secret_kind) {
221 return Ok(Rc::clone(secret));
222 }
223
224 let secret = Rc::new(self.get_shared_secret_uncached(secret_kind)?);
225 self.borrow_mut().shared_secret_cache.insert(secret_kind.to_owned(), Rc::clone(&secret));
226 Ok(secret)
227 }
228
229 fn get_shared_secret_uncached(&self, secret_kind: &str) -> Result<KeySecret, String> {
230 let credentials = match secret_kind {
231 CONTENT_SECRET => self.get_credentials_with_role(&HashSet::from_iter([
232 ADMIN_ROLE.to_owned(),
233 WRITER_ROLE.to_owned(),
234 READER_ROLE.to_owned(),
235 ADMIN_INVITATION_ROLE.to_owned(),
236 WRITE_INVITATION_ROLE.to_owned(),
237 READ_INVITATION_ROLE.to_owned(),
238 ])),
239 PARTICIPATION_SECRET => self.get_credentials_with_role(&HashSet::from_iter([
240 ADMIN_ROLE.to_owned(),
241 WRITER_ROLE.to_owned(),
242 ADMIN_INVITATION_ROLE.to_owned(),
243 WRITE_INVITATION_ROLE.to_owned(),
244 READ_INVITATION_ROLE.to_owned(),
245 ])),
246 _ => self.get_credentials_with_role(&HashSet::from_iter([ADMIN_ROLE.to_owned()])),
247 };
248
249 match &self.current_state() {
250 Some(state) => {
251 let (oks, errs) = credentials
252 .iter()
253 .map(|(credential, credential_role, source_crew)| {
254 if source_crew == &self.id() {
255 let encr_secret = state
256 .shared_secrets
257 .get(secret_kind)
258 .ok_or("No shared secrets for that kind")?
259 .get(&credential.pub_id().signer)
260 .ok_or(format!("Secret not revealed for crew credential {:?} with role {}", credential, credential_role))?;
261 let secret = credential
262 .decrypt_secret(encr_secret)
263 .map_err(|err| err.to_string())?;
264 Ok(secret)
265 } else {
266 let parent_crew = self
267 .get_parent_for_ancestor(&source_crew)
268 .expect("Expected parent crew to be loaded")
269 .clone_ref();
270 let parent_secret = parent_crew.get_shared_secret(secret_kind)?;
271 let entry = state
272 .parent_secret_map
273 .get(&(parent_crew.id(), secret_kind.to_owned(), parent_secret.id))
274 .ok_or_else(|| {
275 "Expected to have entry in parent secret map".to_owned()
276 })?;
277 let secret = parent_secret
278 .decrypt(entry)
279 .map_err(|err| err.to_string())?;
280 Ok(secret)
281 }
282 })
283 .partition::<Vec<_>, _>(Result::is_ok);
284 oks.into_iter().next().unwrap_or_else(|| {
285 Err(format!(
286 "Couldn't find any valid credential for secret kind {}: {:?}",
287 secret_kind, errs
288 ))
289 })
290 }
291 None => Err("No valid state yet".to_string()),
292 }
293 }
294
295 pub fn get_entrusted_info(
296 &self,
297 secret_kind: &str,
298 info_id: &str,
299 ) -> Result<litl::Val, String> {
300 let secret = self.get_shared_secret(secret_kind)?;
301
302 match &self.current_state() {
303 Some(state) => {
304 let encr_info = state
305 .entrusted_info
306 .get(secret_kind)
307 .ok_or("No entrusted infos for that secret kind")?
308 .get(info_id)
309 .ok_or("No such entrusted info")?;
310 let info = secret.decrypt(encr_info).map_err(|err| err.to_string())?;
311 Ok(info)
312 }
313 None => Err("No valid state yet".to_string()),
314 }
315 }
316
317 pub async fn add_update_listener(&self, listener: Listener<Rc<CrewUpdate>>) {
318 let initial_update = { self.borrow().last_update.clone() };
319 self.listeners()
320 .add_with_initial_msg(listener, initial_update)
321 .await;
322 }
323
324 pub fn updates(
325 &self,
326 listener_prefix: String,
327 ) -> impl Stream<Item = Rc<CrewUpdate>> + Unpin + 'static {
328 let (updates_tx, updates_rx) = futures::channel::mpsc::channel(100);
329
330 let self_clone = self.clone();
331
332 stream::once(async move {
333 self_clone
334 .add_update_listener(Listener::new(
335 &format!("{}_{:?}", listener_prefix, rand::random::<u64>()),
336 updates_tx,
337 ))
338 .await;
339
340 updates_rx
341 })
342 .flatten()
343 .boxed_local()
344 }
345
346 pub async fn handle_set_diff(&self, diff: endr::Diff) {
347 let set_diff = diff.expect_set();
348
349 if let Some(header) = &set_diff.header {
350 let meta = header.meta.clone().expect("Expected header to have meta");
351
352 assert_eq!(meta.get("type").unwrap(), &litl::Val::str("crew"));
353
354 let initial_state = litl::from_val::<CrewState>(
355 meta.get("initialState")
356 .cloned()
357 .expect("Expected initial state"),
358 )
359 .expect("Expected intial state to deserialize");
360
361 self.borrow_mut()
362 .unchecked_timeline
363 .set_intial_state(initial_state, MsSinceEpoch(0)); }
365
366 for new_item in &set_diff.new_items {
367 let tx = litl::from_val::<CrewChangeTx>(new_item.attested.data.clone())
368 .unwrap_or_else(|_| panic!("Expected tx to deserialize {:?}", new_item));
369
370 let mut crew_mut = self.borrow_mut();
371 let tx_id = new_item.id();
372
373 crew_mut.unchecked_timeline.insert_own_tx(tx, tx_id);
374 }
375
376 let update = self.borrow().unchecked_timeline.resolve(CrewRulesV1);
377
378 let manager = self.borrow().manager.clone();
379 for parent_id in update
380 .current_state()
381 .iter()
382 .flat_map(|state| state.parents.iter())
383 {
384 if self.borrow().parents_unchecked.contains_key(parent_id) {
385 continue;
386 }
387 let parent_crew = manager.load_crew(*parent_id).await;
388 self.borrow_mut()
389 .parents_unchecked
390 .insert(*parent_id, parent_crew.clone());
391
392 let self_bg = self.clone();
393 let parent_id = *parent_id;
394
395 self.borrow().background.add_background_task(
396 parent_crew
397 .updates("child_crew".to_owned())
398 .for_each(move |update| {
399 let self_bg = self_bg.clone();
400
401 async move {
402 for (made_at, state) in &update.timeline {
403 self_bg
404 .borrow_mut()
405 .unchecked_timeline
406 .insert_parent_state(state.clone(), parent_id, *made_at)
407 }
408 let update = Rc::new(self_bg.borrow().unchecked_timeline.resolve(CrewRulesV1));
409 self_bg.borrow_mut().last_update = Some(update.clone());
410 self_bg.listeners().broadcast(update).await;
411 }
412 })
413 .boxed_local(),
414 )
415 }
416
417 let update_rc = Rc::new(update);
418
419 self.borrow_mut().last_update = Some(update_rc.clone());
420 self.listeners().broadcast(update_rc).await;
421 }
422
423 pub async fn wait_for_state(&self, condition: impl Fn(&CrewState) -> bool) {
424 let mut updates = self.updates("waiting for update".to_owned());
425
426 while let Some(update) = updates.next().await {
427 if let Some(state) = update.current_state() {
428 if condition(state) {
429 return;
430 }
431 }
432 }
433
434 panic!("Reached end of updates stream without matching state")
435 }
436
437 pub fn is_signed_by_member_with_role<T: Clone + Serialize>(
438 &self,
439 signed: &Signed<T>,
440 role: &str,
441 ) -> Result<(), Vec<SignatureError>> {
442 let members_with_role = self
443 .current_state()
444 .expect("Expected crew state when validating signed data")
445 .roles
446 .into_iter()
447 .filter_map(|(member, member_role)| {
448 if member_role == role {
449 Some(member)
450 } else {
451 None
452 }
453 });
454
455 let mut errors = vec![];
456
457 for member in members_with_role {
458 match signed.ensure_signed_by(&member.signer) {
459 Ok(()) => return Ok(()),
460 Err(e) => errors.push(e),
461 }
462 }
463
464 Err(errors)
465 }
466
467 pub async fn create_invitation(&self, invitation_role: &str) -> Result<Invitation, String> {
468 let invitation_credential = MemberCredential::new_random();
469 let content_secret = self.get_shared_secret(CONTENT_SECRET).map_err(|err| {
470 format!(
471 "Expected to have content secret when creating invitation: {}",
472 err
473 )
474 })?;
475 let participation_secret = self
476 .get_shared_secret(PARTICIPATION_SECRET)
477 .map_err(|err| {
478 format!(
479 "Expected to have participation secret when creating invitation: {}",
480 err
481 )
482 })?;
483
484 self.make_changes([
485 CrewChange::AddRole(AddRole {
486 to: invitation_credential.pub_id(),
487 role: invitation_role.to_string(),
488 }),
489 CrewChange::RevealSecret(RevealSecret {
490 secret_kind: CONTENT_SECRET.to_owned(),
491 to: invitation_credential.signer().pub_id(),
492 encr: invitation_credential
493 .pub_id()
494 .recipient
495 .encrypt_from_anon(&content_secret),
496 }),
497 CrewChange::RevealSecret(RevealSecret {
498 secret_kind: PARTICIPATION_SECRET.to_owned(),
499 to: invitation_credential.signer().pub_id(),
500 encr: invitation_credential
501 .pub_id()
502 .recipient
503 .encrypt_from_anon(&participation_secret),
504 }),
505 ])
506 .await?;
507
508 Ok(Invitation {
509 crew_id: self.id(),
510 invitation_role: invitation_role.to_string(),
511 invitation_credential,
512 })
513 }
514}
515
516#[derive(Clone, Serialize, Deserialize)]
517pub struct Invitation {
518 pub crew_id: CrewID,
519 pub invitation_role: String,
520 pub invitation_credential: MemberCredential,
521}