1use std::collections::{HashMap, HashSet};
6use std::path::{Path, PathBuf};
7use std::sync::RwLock;
8
9use crate::group::{
10 GroupCreationRecord, GroupRecordError, GroupType, PrivateGroupMemberAddRecord,
11 PrivateGroupMemberRemoveRecord, PublicGroupAcceptRecord, PublicGroupInviteRecord,
12 PublicGroupLeaveRecord,
13};
14use crate::id::{GroupId, PilotId};
15use crate::util::write_json_file_atomic as write_json_file_atomic_impl;
16
17const GROUPS_DIRNAME: &str = "groups";
18const CREATIONS_DIRNAME: &str = "creations";
19const PRIVATE_ADDS_DIRNAME: &str = "private-member-adds";
20const PRIVATE_REMOVES_DIRNAME: &str = "private-member-removes";
21const PUBLIC_INVITES_DIRNAME: &str = "public-invites";
22const PUBLIC_ACCEPTS_DIRNAME: &str = "public-accepts";
23const PUBLIC_LEAVES_DIRNAME: &str = "public-leaves";
24
25#[derive(Debug, thiserror::Error)]
28pub enum GroupStoreError {
29 #[error("I/O: {0}")]
30 Io(#[from] std::io::Error),
31 #[error("JSON: {0}")]
32 Json(#[from] serde_json::Error),
33 #[error("group record: {0}")]
34 Record(#[from] GroupRecordError),
35 #[error("group {0} not found")]
36 GroupNotFound(String),
37 #[error("group {0} is not a private group")]
38 NotPrivateGroup(String),
39 #[error("group {0} is not a public group")]
40 NotPublicGroup(String),
41 #[error("signer {0} is not the group owner")]
42 NotGroupOwner(String),
43 #[error("pilot {0} is already a member of group {1}")]
44 AlreadyMember(String, String),
45 #[error("pilot {0} has no pending invitation to group {1}")]
46 NoPendingInvitation(String, String),
47 #[error("pilot {0} is not a member of group {1}")]
48 NotMember(String, String),
49 #[error("signer {0} is not an existing member of group {1}")]
50 NotExistingMember(String, String),
51 #[error("missing parent directory")]
52 MissingParentDirectory,
53 #[error("group name too long (max 128 UTF-8 characters)")]
54 NameTooLong,
55 #[error("group name {0:?} is already taken")]
56 DuplicateName(String),
57}
58
59#[derive(Debug, Clone)]
62pub struct GroupMembership {
63 pub group_id: GroupId,
64 pub group_type: GroupType,
65 pub name: Option<String>,
66 pub creator_pilot_id: PilotId,
67 pub is_owner: bool,
68}
69
70pub struct GroupStore {
73 root: PathBuf,
74 meta: RwLock<HashMap<GroupId, GroupCreationRecord>>,
76 private_members: RwLock<HashMap<GroupId, HashSet<PilotId>>>,
78 private_owned: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
80 private_as_member: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
82 public_members: RwLock<HashMap<GroupId, HashSet<PilotId>>>,
84 pilot_public_groups: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
86 pending_invitations: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
88}
89
90impl GroupStore {
91 pub fn open(root: impl Into<PathBuf>) -> Self {
92 Self {
93 root: root.into(),
94 meta: RwLock::new(HashMap::new()),
95 private_members: RwLock::new(HashMap::new()),
96 private_owned: RwLock::new(HashMap::new()),
97 private_as_member: RwLock::new(HashMap::new()),
98 public_members: RwLock::new(HashMap::new()),
99 pilot_public_groups: RwLock::new(HashMap::new()),
100 pending_invitations: RwLock::new(HashMap::new()),
101 }
102 }
103
104 pub fn for_data_dir(data_dir: impl AsRef<Path>) -> Self {
105 Self::open(data_dir.as_ref().join(GROUPS_DIRNAME))
106 }
107
108 fn init_dirs(&self) -> Result<(), GroupStoreError> {
109 let legacy_adds = self.root.join("personal-member-adds");
111 let legacy_removes = self.root.join("personal-member-removes");
112 if legacy_adds.exists() {
113 std::fs::rename(&legacy_adds, self.private_adds_dir())?;
114 }
115 if legacy_removes.exists() {
116 std::fs::rename(&legacy_removes, self.private_removes_dir())?;
117 }
118 std::fs::create_dir_all(self.creations_dir())?;
119 std::fs::create_dir_all(self.private_adds_dir())?;
120 std::fs::create_dir_all(self.private_removes_dir())?;
121 std::fs::create_dir_all(self.public_invites_dir())?;
122 std::fs::create_dir_all(self.public_accepts_dir())?;
123 std::fs::create_dir_all(self.public_leaves_dir())?;
124 Ok(())
125 }
126
127 pub fn init(&self) -> Result<(), GroupStoreError> {
129 self.init_dirs()?;
130 for record in self.load_all_json::<GroupCreationRecord>(self.creations_dir())? {
132 record.validate()?;
133 self.apply_creation(&record);
134 }
135 let mut adds = self.load_all_json::<PrivateGroupMemberAddRecord>(self.private_adds_dir())?;
136 adds.sort_by(|a, b| a.created_at.cmp(&b.created_at));
137 for record in adds {
138 record.validate()?;
139 self.apply_private_add(&record);
140 }
141 let mut removes = self.load_all_json::<PrivateGroupMemberRemoveRecord>(self.private_removes_dir())?;
142 removes.sort_by(|a, b| a.created_at.cmp(&b.created_at));
143 for record in removes {
144 record.validate()?;
145 self.apply_private_remove(&record);
146 }
147 let mut invites = self.load_all_json::<PublicGroupInviteRecord>(self.public_invites_dir())?;
148 invites.sort_by(|a, b| a.created_at.cmp(&b.created_at));
149 for record in invites {
150 record.validate()?;
151 self.apply_public_invite(&record);
152 }
153 let mut accepts = self.load_all_json::<PublicGroupAcceptRecord>(self.public_accepts_dir())?;
154 accepts.sort_by(|a, b| a.created_at.cmp(&b.created_at));
155 for record in accepts {
156 record.validate()?;
157 self.apply_public_accept(&record);
158 }
159 let mut leaves = self.load_all_json::<PublicGroupLeaveRecord>(self.public_leaves_dir())?;
160 leaves.sort_by(|a, b| a.created_at.cmp(&b.created_at));
161 for record in leaves {
162 record.validate()?;
163 self.apply_public_leave(&record);
164 }
165 Ok(())
166 }
167
168 pub fn create_group(&self, record: GroupCreationRecord) -> Result<(), GroupStoreError> {
171 self.init_dirs()?;
172 record.validate()?;
173 if let Some(name) = &record.name {
174 if name.chars().count() > 128 {
175 return Err(GroupStoreError::NameTooLong);
176 }
177 if self.lookup_by_name(name).is_some() {
178 return Err(GroupStoreError::DuplicateName(name.clone()));
179 }
180 }
181 let path = self.creations_dir().join(format!("{}.json", record.group_id.id_hex()));
182 if path.exists() {
183 return Ok(());
184 }
185 write_json_file_atomic(&path, &record)?;
186 self.apply_creation(&record);
187 Ok(())
188 }
189
190 pub fn add_private_group_member(
191 &self,
192 record: PrivateGroupMemberAddRecord,
193 ) -> Result<(), GroupStoreError> {
194 self.init_dirs()?;
195 record.validate()?;
196 self.check_private_group_owner(&record.group_id, &record.added_by_pilot_id)?;
197 if self.is_private_member(&record.group_id, &record.member_pilot_id) {
198 return Err(GroupStoreError::AlreadyMember(
199 record.member_pilot_id.to_string(),
200 record.group_id.to_string(),
201 ));
202 }
203 let path = self.private_adds_dir().join(format!("{}.json", record.record_id));
204 if !path.exists() {
205 write_json_file_atomic(&path, &record)?;
206 self.apply_private_add(&record);
207 }
208 Ok(())
209 }
210
211 pub fn remove_private_group_member(
212 &self,
213 record: PrivateGroupMemberRemoveRecord,
214 ) -> Result<(), GroupStoreError> {
215 self.init_dirs()?;
216 record.validate()?;
217 self.check_private_group_owner(&record.group_id, &record.removed_by_pilot_id)?;
218 if !self.is_private_member(&record.group_id, &record.member_pilot_id) {
219 return Err(GroupStoreError::NotMember(
220 record.member_pilot_id.to_string(),
221 record.group_id.to_string(),
222 ));
223 }
224 let path = self.private_removes_dir().join(format!("{}.json", record.record_id));
225 if !path.exists() {
226 write_json_file_atomic(&path, &record)?;
227 self.apply_private_remove(&record);
228 }
229 Ok(())
230 }
231
232 pub fn invite_to_public_group(
233 &self,
234 record: PublicGroupInviteRecord,
235 ) -> Result<(), GroupStoreError> {
236 self.init_dirs()?;
237 record.validate()?;
238 self.check_public_group_member(&record.group_id, &record.invited_by_pilot_id)?;
239 if self.is_public_member(&record.group_id, &record.invited_pilot_id) {
240 return Err(GroupStoreError::AlreadyMember(
241 record.invited_pilot_id.to_string(),
242 record.group_id.to_string(),
243 ));
244 }
245 let path = self.public_invites_dir().join(format!("{}.json", record.record_id));
246 if !path.exists() {
247 write_json_file_atomic(&path, &record)?;
248 self.apply_public_invite(&record);
249 }
250 Ok(())
251 }
252
253 pub fn accept_group_invitation(
254 &self,
255 record: PublicGroupAcceptRecord,
256 ) -> Result<(), GroupStoreError> {
257 self.init_dirs()?;
258 record.validate()?;
259 if !self.has_pending_invitation(&record.group_id, &record.member_pilot_id) {
260 return Err(GroupStoreError::NoPendingInvitation(
261 record.member_pilot_id.to_string(),
262 record.group_id.to_string(),
263 ));
264 }
265 let path = self.public_accepts_dir().join(format!("{}.json", record.record_id));
266 if !path.exists() {
267 write_json_file_atomic(&path, &record)?;
268 self.apply_public_accept(&record);
269 }
270 Ok(())
271 }
272
273 pub fn leave_group(&self, record: PublicGroupLeaveRecord) -> Result<(), GroupStoreError> {
274 self.init_dirs()?;
275 record.validate()?;
276 if !self.is_public_member(&record.group_id, &record.member_pilot_id) {
277 return Err(GroupStoreError::NotMember(
278 record.member_pilot_id.to_string(),
279 record.group_id.to_string(),
280 ));
281 }
282 let path = self.public_leaves_dir().join(format!("{}.json", record.record_id));
283 if !path.exists() {
284 write_json_file_atomic(&path, &record)?;
285 self.apply_public_leave(&record);
286 }
287 Ok(())
288 }
289
290 pub fn pilot_has_private_group_access(&self, requester: &PilotId, owner: &PilotId) -> bool {
294 let member_groups = self.private_as_member.read().unwrap();
295 let owned_groups = self.private_owned.read().unwrap();
296 if let (Some(member_set), Some(owner_set)) =
297 (member_groups.get(requester), owned_groups.get(owner))
298 {
299 return member_set.intersection(owner_set).next().is_some();
300 }
301 false
302 }
303
304 pub fn pilots_share_public_group(&self, pilot_a: &PilotId, pilot_b: &PilotId) -> bool {
306 let groups = self.pilot_public_groups.read().unwrap();
307 if let (Some(a_groups), Some(b_groups)) = (groups.get(pilot_a), groups.get(pilot_b)) {
308 return a_groups.intersection(b_groups).next().is_some();
309 }
310 false
311 }
312
313 pub fn list_pilot_groups(&self, pilot_id: &PilotId) -> Vec<GroupMembership> {
315 let meta = self.meta.read().unwrap();
316 let private_owned = self.private_owned.read().unwrap();
317 let private_as_member = self.private_as_member.read().unwrap();
318 let pilot_public = self.pilot_public_groups.read().unwrap();
319
320 let mut group_ids: HashSet<GroupId> = HashSet::new();
321 if let Some(owned) = private_owned.get(pilot_id) {
322 group_ids.extend(owned.iter().cloned());
323 }
324 if let Some(memberships) = private_as_member.get(pilot_id) {
325 group_ids.extend(memberships.iter().cloned());
326 }
327 if let Some(public) = pilot_public.get(pilot_id) {
328 group_ids.extend(public.iter().cloned());
329 }
330
331 let owned_set = private_owned.get(pilot_id).cloned().unwrap_or_default();
332
333 group_ids
334 .into_iter()
335 .filter_map(|gid| {
336 meta.get(&gid).map(|m| GroupMembership {
337 group_id: gid.clone(),
338 group_type: m.group_type.clone(),
339 name: m.name.clone(),
340 creator_pilot_id: m.creator_pilot_id.clone(),
341 is_owner: owned_set.contains(&gid),
342 })
343 })
344 .collect()
345 }
346
347 pub fn list_group_members(&self, group_id: &GroupId) -> Vec<PilotId> {
349 let public_members = self.public_members.read().unwrap();
350 public_members
351 .get(group_id)
352 .map(|s| s.iter().cloned().collect())
353 .unwrap_or_default()
354 }
355
356 pub fn list_private_group_members(&self, group_id: &GroupId) -> Vec<PilotId> {
358 let private_members = self.private_members.read().unwrap();
359 private_members
360 .get(group_id)
361 .map(|s| s.iter().cloned().collect())
362 .unwrap_or_default()
363 }
364
365 pub fn list_pending_invitations(&self, pilot_id: &PilotId) -> Vec<GroupId> {
367 let pending = self.pending_invitations.read().unwrap();
368 pending
369 .get(pilot_id)
370 .map(|s| s.iter().cloned().collect())
371 .unwrap_or_default()
372 }
373
374 pub fn group_meta(&self, group_id: &GroupId) -> Option<GroupCreationRecord> {
375 self.meta.read().unwrap().get(group_id).cloned()
376 }
377
378 pub fn lookup_by_name(&self, name: &str) -> Option<GroupCreationRecord> {
380 self.meta
381 .read()
382 .unwrap()
383 .values()
384 .find(|r| r.name.as_deref() == Some(name))
385 .cloned()
386 }
387
388 fn apply_creation(&self, record: &GroupCreationRecord) {
391 let mut meta = self.meta.write().unwrap();
392 meta.entry(record.group_id.clone())
393 .or_insert_with(|| record.clone());
394
395 match record.group_type {
396 GroupType::Private => {
397 let mut owned = self.private_owned.write().unwrap();
398 owned
399 .entry(record.creator_pilot_id.clone())
400 .or_default()
401 .insert(record.group_id.clone());
402 self.private_members
403 .write()
404 .unwrap()
405 .entry(record.group_id.clone())
406 .or_default();
407 }
408 GroupType::Public => {
409 let mut pub_members = self.public_members.write().unwrap();
411 pub_members
412 .entry(record.group_id.clone())
413 .or_default()
414 .insert(record.creator_pilot_id.clone());
415 let mut pilot_pub = self.pilot_public_groups.write().unwrap();
416 pilot_pub
417 .entry(record.creator_pilot_id.clone())
418 .or_default()
419 .insert(record.group_id.clone());
420 }
421 }
422 }
423
424 fn apply_private_add(&self, record: &PrivateGroupMemberAddRecord) {
425 self.private_members
426 .write()
427 .unwrap()
428 .entry(record.group_id.clone())
429 .or_default()
430 .insert(record.member_pilot_id.clone());
431 self.private_as_member
432 .write()
433 .unwrap()
434 .entry(record.member_pilot_id.clone())
435 .or_default()
436 .insert(record.group_id.clone());
437 }
438
439 fn apply_private_remove(&self, record: &PrivateGroupMemberRemoveRecord) {
440 if let Some(set) = self.private_members.write().unwrap().get_mut(&record.group_id) {
441 set.remove(&record.member_pilot_id);
442 }
443 if let Some(set) = self.private_as_member.write().unwrap().get_mut(&record.member_pilot_id) {
444 set.remove(&record.group_id);
445 }
446 }
447
448 fn apply_public_invite(&self, record: &PublicGroupInviteRecord) {
449 self.pending_invitations
450 .write()
451 .unwrap()
452 .entry(record.invited_pilot_id.clone())
453 .or_default()
454 .insert(record.group_id.clone());
455 }
456
457 fn apply_public_accept(&self, record: &PublicGroupAcceptRecord) {
458 if let Some(set) = self.pending_invitations.write().unwrap().get_mut(&record.member_pilot_id) {
460 set.remove(&record.group_id);
461 }
462 self.public_members
464 .write()
465 .unwrap()
466 .entry(record.group_id.clone())
467 .or_default()
468 .insert(record.member_pilot_id.clone());
469 self.pilot_public_groups
470 .write()
471 .unwrap()
472 .entry(record.member_pilot_id.clone())
473 .or_default()
474 .insert(record.group_id.clone());
475 }
476
477 fn apply_public_leave(&self, record: &PublicGroupLeaveRecord) {
478 if let Some(set) = self.public_members.write().unwrap().get_mut(&record.group_id) {
479 set.remove(&record.member_pilot_id);
480 }
481 if let Some(set) = self.pilot_public_groups.write().unwrap().get_mut(&record.member_pilot_id) {
482 set.remove(&record.group_id);
483 }
484 }
485
486 fn check_private_group_owner(
489 &self,
490 group_id: &GroupId,
491 pilot_id: &PilotId,
492 ) -> Result<(), GroupStoreError> {
493 let meta = self.meta.read().unwrap();
494 let record = meta
495 .get(group_id)
496 .ok_or_else(|| GroupStoreError::GroupNotFound(group_id.to_string()))?;
497 if record.group_type != GroupType::Private {
498 return Err(GroupStoreError::NotPrivateGroup(group_id.to_string()));
499 }
500 if &record.creator_pilot_id != pilot_id {
501 return Err(GroupStoreError::NotGroupOwner(pilot_id.to_string()));
502 }
503 Ok(())
504 }
505
506 fn check_public_group_member(
507 &self,
508 group_id: &GroupId,
509 pilot_id: &PilotId,
510 ) -> Result<(), GroupStoreError> {
511 let meta = self.meta.read().unwrap();
512 let record = meta
513 .get(group_id)
514 .ok_or_else(|| GroupStoreError::GroupNotFound(group_id.to_string()))?;
515 if record.group_type != GroupType::Public {
516 return Err(GroupStoreError::NotPublicGroup(group_id.to_string()));
517 }
518 drop(meta);
519 if !self.is_public_member(group_id, pilot_id) {
520 return Err(GroupStoreError::NotExistingMember(
521 pilot_id.to_string(),
522 group_id.to_string(),
523 ));
524 }
525 Ok(())
526 }
527
528 fn is_private_member(&self, group_id: &GroupId, pilot_id: &PilotId) -> bool {
529 self.private_members
530 .read()
531 .unwrap()
532 .get(group_id)
533 .map(|s| s.contains(pilot_id))
534 .unwrap_or(false)
535 }
536
537 fn is_public_member(&self, group_id: &GroupId, pilot_id: &PilotId) -> bool {
538 self.public_members
539 .read()
540 .unwrap()
541 .get(group_id)
542 .map(|s| s.contains(pilot_id))
543 .unwrap_or(false)
544 }
545
546 fn has_pending_invitation(&self, group_id: &GroupId, pilot_id: &PilotId) -> bool {
547 self.pending_invitations
548 .read()
549 .unwrap()
550 .get(pilot_id)
551 .map(|s| s.contains(group_id))
552 .unwrap_or(false)
553 }
554
555 fn creations_dir(&self) -> PathBuf { self.root.join(CREATIONS_DIRNAME) }
558 fn private_adds_dir(&self) -> PathBuf { self.root.join(PRIVATE_ADDS_DIRNAME) }
559 fn private_removes_dir(&self) -> PathBuf { self.root.join(PRIVATE_REMOVES_DIRNAME) }
560 fn public_invites_dir(&self) -> PathBuf { self.root.join(PUBLIC_INVITES_DIRNAME) }
561 fn public_accepts_dir(&self) -> PathBuf { self.root.join(PUBLIC_ACCEPTS_DIRNAME) }
562 fn public_leaves_dir(&self) -> PathBuf { self.root.join(PUBLIC_LEAVES_DIRNAME) }
563
564 fn load_all_json<T: serde::de::DeserializeOwned>(
567 &self,
568 dir: PathBuf,
569 ) -> Result<Vec<T>, GroupStoreError> {
570 if !dir.exists() {
571 return Ok(Vec::new());
572 }
573 let mut records = Vec::new();
574 for entry in std::fs::read_dir(&dir)?.filter_map(Result::ok) {
575 let path = entry.path();
576 if path.extension().and_then(|e| e.to_str()) != Some("json") {
577 continue;
578 }
579 let bytes = std::fs::read(&path)?;
580 records.push(serde_json::from_slice(&bytes)?);
581 }
582 Ok(records)
583 }
584}
585
586fn write_json_file_atomic<T: serde::Serialize>(
587 path: &Path,
588 value: &T,
589) -> Result<(), GroupStoreError> {
590 write_json_file_atomic_impl(
591 path,
592 value,
593 |parent| {
594 std::fs::create_dir_all(parent)?;
595 Ok(())
596 },
597 |tmp_path, bytes| {
598 std::fs::write(tmp_path, bytes)?;
599 Ok(())
600 },
601 GroupStoreError::MissingParentDirectory,
602 )
603}