Skip to main content

igc_net/
group_store.rs

1//! Persistent group membership store with in-memory caches.
2//!
3//! Implements the storage layout described in `specs/75-groups-and-social.md §8`.
4
5use std::collections::{HashMap, HashSet};
6use std::path::{Path, PathBuf};
7use std::sync::RwLock;
8
9use crate::group::{
10    GroupCreationRecord, GroupRecordError, GroupType, PrivateGroupMemberAddRecord,
11    PrivateGroupMemberRemoveRecord, PublicGroupAcceptRecord, PublicGroupInviteRecord,
12    PublicGroupLeaveRecord,
13};
14use crate::id::{GroupId, PilotId};
15use crate::util::write_json_file_atomic as write_json_file_atomic_impl;
16
17const GROUPS_DIRNAME: &str = "groups";
18const CREATIONS_DIRNAME: &str = "creations";
19const PRIVATE_ADDS_DIRNAME: &str = "private-member-adds";
20const PRIVATE_REMOVES_DIRNAME: &str = "private-member-removes";
21const PUBLIC_INVITES_DIRNAME: &str = "public-invites";
22const PUBLIC_ACCEPTS_DIRNAME: &str = "public-accepts";
23const PUBLIC_LEAVES_DIRNAME: &str = "public-leaves";
24
25// ── Error ─────────────────────────────────────────────────────────────────────
26
27#[derive(Debug, thiserror::Error)]
28pub enum GroupStoreError {
29    #[error("I/O: {0}")]
30    Io(#[from] std::io::Error),
31    #[error("JSON: {0}")]
32    Json(#[from] serde_json::Error),
33    #[error("group record: {0}")]
34    Record(#[from] GroupRecordError),
35    #[error("group {0} not found")]
36    GroupNotFound(String),
37    #[error("group {0} is not a private group")]
38    NotPrivateGroup(String),
39    #[error("group {0} is not a public group")]
40    NotPublicGroup(String),
41    #[error("signer {0} is not the group owner")]
42    NotGroupOwner(String),
43    #[error("pilot {0} is already a member of group {1}")]
44    AlreadyMember(String, String),
45    #[error("pilot {0} has no pending invitation to group {1}")]
46    NoPendingInvitation(String, String),
47    #[error("pilot {0} is not a member of group {1}")]
48    NotMember(String, String),
49    #[error("signer {0} is not an existing member of group {1}")]
50    NotExistingMember(String, String),
51    #[error("missing parent directory")]
52    MissingParentDirectory,
53    #[error("group name too long (max 128 UTF-8 characters)")]
54    NameTooLong,
55    #[error("group name {0:?} is already taken")]
56    DuplicateName(String),
57}
58
59// ── GroupMembership (returned by list_pilot_groups) ───────────────────────────
60
61#[derive(Debug, Clone)]
62pub struct GroupMembership {
63    pub group_id: GroupId,
64    pub group_type: GroupType,
65    pub name: Option<String>,
66    pub creator_pilot_id: PilotId,
67    pub is_owner: bool,
68}
69
70// ── GroupStore ────────────────────────────────────────────────────────────────
71
72pub struct GroupStore {
73    root: PathBuf,
74    /// group_id → GroupCreationRecord
75    meta: RwLock<HashMap<GroupId, GroupCreationRecord>>,
76    /// private group_id → set of member PilotIds (excludes the owner)
77    private_members: RwLock<HashMap<GroupId, HashSet<PilotId>>>,
78    /// PilotId → private group_ids where the pilot is the owner
79    private_owned: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
80    /// PilotId → private group_ids where the pilot is a member (not owner)
81    private_as_member: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
82    /// public group_id → set of full members (have accepted)
83    public_members: RwLock<HashMap<GroupId, HashSet<PilotId>>>,
84    /// PilotId → public group_ids where pilot is a full member
85    pilot_public_groups: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
86    /// PilotId → public group_ids where pilot has a pending invitation
87    pending_invitations: RwLock<HashMap<PilotId, HashSet<GroupId>>>,
88}
89
90impl GroupStore {
91    pub fn open(root: impl Into<PathBuf>) -> Self {
92        Self {
93            root: root.into(),
94            meta: RwLock::new(HashMap::new()),
95            private_members: RwLock::new(HashMap::new()),
96            private_owned: RwLock::new(HashMap::new()),
97            private_as_member: RwLock::new(HashMap::new()),
98            public_members: RwLock::new(HashMap::new()),
99            pilot_public_groups: RwLock::new(HashMap::new()),
100            pending_invitations: RwLock::new(HashMap::new()),
101        }
102    }
103
104    pub fn for_data_dir(data_dir: impl AsRef<Path>) -> Self {
105        Self::open(data_dir.as_ref().join(GROUPS_DIRNAME))
106    }
107
108    fn init_dirs(&self) -> Result<(), GroupStoreError> {
109        // One-shot migration: rename legacy personal-* directories to private-*.
110        let legacy_adds = self.root.join("personal-member-adds");
111        let legacy_removes = self.root.join("personal-member-removes");
112        if legacy_adds.exists() {
113            std::fs::rename(&legacy_adds, self.private_adds_dir())?;
114        }
115        if legacy_removes.exists() {
116            std::fs::rename(&legacy_removes, self.private_removes_dir())?;
117        }
118        std::fs::create_dir_all(self.creations_dir())?;
119        std::fs::create_dir_all(self.private_adds_dir())?;
120        std::fs::create_dir_all(self.private_removes_dir())?;
121        std::fs::create_dir_all(self.public_invites_dir())?;
122        std::fs::create_dir_all(self.public_accepts_dir())?;
123        std::fs::create_dir_all(self.public_leaves_dir())?;
124        Ok(())
125    }
126
127    /// Load all persisted records into the in-memory cache.  Call once at startup.
128    pub fn init(&self) -> Result<(), GroupStoreError> {
129        self.init_dirs()?;
130        // Process in dependency order: creations → adds → removes → invites → accepts → leaves.
131        for record in self.load_all_json::<GroupCreationRecord>(self.creations_dir())? {
132            record.validate()?;
133            self.apply_creation(&record);
134        }
135        let mut adds = self.load_all_json::<PrivateGroupMemberAddRecord>(self.private_adds_dir())?;
136        adds.sort_by(|a, b| a.created_at.cmp(&b.created_at));
137        for record in adds {
138            record.validate()?;
139            self.apply_private_add(&record);
140        }
141        let mut removes = self.load_all_json::<PrivateGroupMemberRemoveRecord>(self.private_removes_dir())?;
142        removes.sort_by(|a, b| a.created_at.cmp(&b.created_at));
143        for record in removes {
144            record.validate()?;
145            self.apply_private_remove(&record);
146        }
147        let mut invites = self.load_all_json::<PublicGroupInviteRecord>(self.public_invites_dir())?;
148        invites.sort_by(|a, b| a.created_at.cmp(&b.created_at));
149        for record in invites {
150            record.validate()?;
151            self.apply_public_invite(&record);
152        }
153        let mut accepts = self.load_all_json::<PublicGroupAcceptRecord>(self.public_accepts_dir())?;
154        accepts.sort_by(|a, b| a.created_at.cmp(&b.created_at));
155        for record in accepts {
156            record.validate()?;
157            self.apply_public_accept(&record);
158        }
159        let mut leaves = self.load_all_json::<PublicGroupLeaveRecord>(self.public_leaves_dir())?;
160        leaves.sort_by(|a, b| a.created_at.cmp(&b.created_at));
161        for record in leaves {
162            record.validate()?;
163            self.apply_public_leave(&record);
164        }
165        Ok(())
166    }
167
168    // ── Write operations ──────────────────────────────────────────────────────
169
170    pub fn create_group(&self, record: GroupCreationRecord) -> Result<(), GroupStoreError> {
171        self.init_dirs()?;
172        record.validate()?;
173        if let Some(name) = &record.name {
174            if name.chars().count() > 128 {
175                return Err(GroupStoreError::NameTooLong);
176            }
177            if self.lookup_by_name(name).is_some() {
178                return Err(GroupStoreError::DuplicateName(name.clone()));
179            }
180        }
181        let path = self.creations_dir().join(format!("{}.json", record.group_id.id_hex()));
182        if path.exists() {
183            return Ok(());
184        }
185        write_json_file_atomic(&path, &record)?;
186        self.apply_creation(&record);
187        Ok(())
188    }
189
190    pub fn add_private_group_member(
191        &self,
192        record: PrivateGroupMemberAddRecord,
193    ) -> Result<(), GroupStoreError> {
194        self.init_dirs()?;
195        record.validate()?;
196        self.check_private_group_owner(&record.group_id, &record.added_by_pilot_id)?;
197        if self.is_private_member(&record.group_id, &record.member_pilot_id) {
198            return Err(GroupStoreError::AlreadyMember(
199                record.member_pilot_id.to_string(),
200                record.group_id.to_string(),
201            ));
202        }
203        let path = self.private_adds_dir().join(format!("{}.json", record.record_id));
204        if !path.exists() {
205            write_json_file_atomic(&path, &record)?;
206            self.apply_private_add(&record);
207        }
208        Ok(())
209    }
210
211    pub fn remove_private_group_member(
212        &self,
213        record: PrivateGroupMemberRemoveRecord,
214    ) -> Result<(), GroupStoreError> {
215        self.init_dirs()?;
216        record.validate()?;
217        self.check_private_group_owner(&record.group_id, &record.removed_by_pilot_id)?;
218        if !self.is_private_member(&record.group_id, &record.member_pilot_id) {
219            return Err(GroupStoreError::NotMember(
220                record.member_pilot_id.to_string(),
221                record.group_id.to_string(),
222            ));
223        }
224        let path = self.private_removes_dir().join(format!("{}.json", record.record_id));
225        if !path.exists() {
226            write_json_file_atomic(&path, &record)?;
227            self.apply_private_remove(&record);
228        }
229        Ok(())
230    }
231
232    pub fn invite_to_public_group(
233        &self,
234        record: PublicGroupInviteRecord,
235    ) -> Result<(), GroupStoreError> {
236        self.init_dirs()?;
237        record.validate()?;
238        self.check_public_group_member(&record.group_id, &record.invited_by_pilot_id)?;
239        if self.is_public_member(&record.group_id, &record.invited_pilot_id) {
240            return Err(GroupStoreError::AlreadyMember(
241                record.invited_pilot_id.to_string(),
242                record.group_id.to_string(),
243            ));
244        }
245        let path = self.public_invites_dir().join(format!("{}.json", record.record_id));
246        if !path.exists() {
247            write_json_file_atomic(&path, &record)?;
248            self.apply_public_invite(&record);
249        }
250        Ok(())
251    }
252
253    pub fn accept_group_invitation(
254        &self,
255        record: PublicGroupAcceptRecord,
256    ) -> Result<(), GroupStoreError> {
257        self.init_dirs()?;
258        record.validate()?;
259        if !self.has_pending_invitation(&record.group_id, &record.member_pilot_id) {
260            return Err(GroupStoreError::NoPendingInvitation(
261                record.member_pilot_id.to_string(),
262                record.group_id.to_string(),
263            ));
264        }
265        let path = self.public_accepts_dir().join(format!("{}.json", record.record_id));
266        if !path.exists() {
267            write_json_file_atomic(&path, &record)?;
268            self.apply_public_accept(&record);
269        }
270        Ok(())
271    }
272
273    pub fn leave_group(&self, record: PublicGroupLeaveRecord) -> Result<(), GroupStoreError> {
274        self.init_dirs()?;
275        record.validate()?;
276        if !self.is_public_member(&record.group_id, &record.member_pilot_id) {
277            return Err(GroupStoreError::NotMember(
278                record.member_pilot_id.to_string(),
279                record.group_id.to_string(),
280            ));
281        }
282        let path = self.public_leaves_dir().join(format!("{}.json", record.record_id));
283        if !path.exists() {
284            write_json_file_atomic(&path, &record)?;
285            self.apply_public_leave(&record);
286        }
287        Ok(())
288    }
289
290    // ── Query operations ──────────────────────────────────────────────────────
291
292    /// True when `requester` is a member of any private group owned by `owner`.
293    pub fn pilot_has_private_group_access(&self, requester: &PilotId, owner: &PilotId) -> bool {
294        let member_groups = self.private_as_member.read().unwrap();
295        let owned_groups = self.private_owned.read().unwrap();
296        if let (Some(member_set), Some(owner_set)) =
297            (member_groups.get(requester), owned_groups.get(owner))
298        {
299            return member_set.intersection(owner_set).next().is_some();
300        }
301        false
302    }
303
304    /// True when both pilots are confirmed members of at least one common public group.
305    pub fn pilots_share_public_group(&self, pilot_a: &PilotId, pilot_b: &PilotId) -> bool {
306        let groups = self.pilot_public_groups.read().unwrap();
307        if let (Some(a_groups), Some(b_groups)) = (groups.get(pilot_a), groups.get(pilot_b)) {
308            return a_groups.intersection(b_groups).next().is_some();
309        }
310        false
311    }
312
313    /// List all groups a pilot is associated with (owned, member-of, or public member).
314    pub fn list_pilot_groups(&self, pilot_id: &PilotId) -> Vec<GroupMembership> {
315        let meta = self.meta.read().unwrap();
316        let private_owned = self.private_owned.read().unwrap();
317        let private_as_member = self.private_as_member.read().unwrap();
318        let pilot_public = self.pilot_public_groups.read().unwrap();
319
320        let mut group_ids: HashSet<GroupId> = HashSet::new();
321        if let Some(owned) = private_owned.get(pilot_id) {
322            group_ids.extend(owned.iter().cloned());
323        }
324        if let Some(memberships) = private_as_member.get(pilot_id) {
325            group_ids.extend(memberships.iter().cloned());
326        }
327        if let Some(public) = pilot_public.get(pilot_id) {
328            group_ids.extend(public.iter().cloned());
329        }
330
331        let owned_set = private_owned.get(pilot_id).cloned().unwrap_or_default();
332
333        group_ids
334            .into_iter()
335            .filter_map(|gid| {
336                meta.get(&gid).map(|m| GroupMembership {
337                    group_id: gid.clone(),
338                    group_type: m.group_type.clone(),
339                    name: m.name.clone(),
340                    creator_pilot_id: m.creator_pilot_id.clone(),
341                    is_owner: owned_set.contains(&gid),
342                })
343            })
344            .collect()
345    }
346
347    /// List all confirmed members of a public group.
348    pub fn list_group_members(&self, group_id: &GroupId) -> Vec<PilotId> {
349        let public_members = self.public_members.read().unwrap();
350        public_members
351            .get(group_id)
352            .map(|s| s.iter().cloned().collect())
353            .unwrap_or_default()
354    }
355
356    /// List all private-group members added by `owner`.
357    pub fn list_private_group_members(&self, group_id: &GroupId) -> Vec<PilotId> {
358        let private_members = self.private_members.read().unwrap();
359        private_members
360            .get(group_id)
361            .map(|s| s.iter().cloned().collect())
362            .unwrap_or_default()
363    }
364
365    /// List public groups with a pending invitation for `pilot_id`.
366    pub fn list_pending_invitations(&self, pilot_id: &PilotId) -> Vec<GroupId> {
367        let pending = self.pending_invitations.read().unwrap();
368        pending
369            .get(pilot_id)
370            .map(|s| s.iter().cloned().collect())
371            .unwrap_or_default()
372    }
373
374    pub fn group_meta(&self, group_id: &GroupId) -> Option<GroupCreationRecord> {
375        self.meta.read().unwrap().get(group_id).cloned()
376    }
377
378    /// Find a group by its exact name (case-sensitive). Returns `None` if no group matches.
379    pub fn lookup_by_name(&self, name: &str) -> Option<GroupCreationRecord> {
380        self.meta
381            .read()
382            .unwrap()
383            .values()
384            .find(|r| r.name.as_deref() == Some(name))
385            .cloned()
386    }
387
388    // ── Private apply helpers (update caches) ─────────────────────────────────
389
390    fn apply_creation(&self, record: &GroupCreationRecord) {
391        let mut meta = self.meta.write().unwrap();
392        meta.entry(record.group_id.clone())
393            .or_insert_with(|| record.clone());
394
395        match record.group_type {
396            GroupType::Private => {
397                let mut owned = self.private_owned.write().unwrap();
398                owned
399                    .entry(record.creator_pilot_id.clone())
400                    .or_default()
401                    .insert(record.group_id.clone());
402                self.private_members
403                    .write()
404                    .unwrap()
405                    .entry(record.group_id.clone())
406                    .or_default();
407            }
408            GroupType::Public => {
409                // Creator is automatically a full member of the public group.
410                let mut pub_members = self.public_members.write().unwrap();
411                pub_members
412                    .entry(record.group_id.clone())
413                    .or_default()
414                    .insert(record.creator_pilot_id.clone());
415                let mut pilot_pub = self.pilot_public_groups.write().unwrap();
416                pilot_pub
417                    .entry(record.creator_pilot_id.clone())
418                    .or_default()
419                    .insert(record.group_id.clone());
420            }
421        }
422    }
423
424    fn apply_private_add(&self, record: &PrivateGroupMemberAddRecord) {
425        self.private_members
426            .write()
427            .unwrap()
428            .entry(record.group_id.clone())
429            .or_default()
430            .insert(record.member_pilot_id.clone());
431        self.private_as_member
432            .write()
433            .unwrap()
434            .entry(record.member_pilot_id.clone())
435            .or_default()
436            .insert(record.group_id.clone());
437    }
438
439    fn apply_private_remove(&self, record: &PrivateGroupMemberRemoveRecord) {
440        if let Some(set) = self.private_members.write().unwrap().get_mut(&record.group_id) {
441            set.remove(&record.member_pilot_id);
442        }
443        if let Some(set) = self.private_as_member.write().unwrap().get_mut(&record.member_pilot_id) {
444            set.remove(&record.group_id);
445        }
446    }
447
448    fn apply_public_invite(&self, record: &PublicGroupInviteRecord) {
449        self.pending_invitations
450            .write()
451            .unwrap()
452            .entry(record.invited_pilot_id.clone())
453            .or_default()
454            .insert(record.group_id.clone());
455    }
456
457    fn apply_public_accept(&self, record: &PublicGroupAcceptRecord) {
458        // Remove from pending.
459        if let Some(set) = self.pending_invitations.write().unwrap().get_mut(&record.member_pilot_id) {
460            set.remove(&record.group_id);
461        }
462        // Add to full members.
463        self.public_members
464            .write()
465            .unwrap()
466            .entry(record.group_id.clone())
467            .or_default()
468            .insert(record.member_pilot_id.clone());
469        self.pilot_public_groups
470            .write()
471            .unwrap()
472            .entry(record.member_pilot_id.clone())
473            .or_default()
474            .insert(record.group_id.clone());
475    }
476
477    fn apply_public_leave(&self, record: &PublicGroupLeaveRecord) {
478        if let Some(set) = self.public_members.write().unwrap().get_mut(&record.group_id) {
479            set.remove(&record.member_pilot_id);
480        }
481        if let Some(set) = self.pilot_public_groups.write().unwrap().get_mut(&record.member_pilot_id) {
482            set.remove(&record.group_id);
483        }
484    }
485
486    // ── Validation helpers ────────────────────────────────────────────────────
487
488    fn check_private_group_owner(
489        &self,
490        group_id: &GroupId,
491        pilot_id: &PilotId,
492    ) -> Result<(), GroupStoreError> {
493        let meta = self.meta.read().unwrap();
494        let record = meta
495            .get(group_id)
496            .ok_or_else(|| GroupStoreError::GroupNotFound(group_id.to_string()))?;
497        if record.group_type != GroupType::Private {
498            return Err(GroupStoreError::NotPrivateGroup(group_id.to_string()));
499        }
500        if &record.creator_pilot_id != pilot_id {
501            return Err(GroupStoreError::NotGroupOwner(pilot_id.to_string()));
502        }
503        Ok(())
504    }
505
506    fn check_public_group_member(
507        &self,
508        group_id: &GroupId,
509        pilot_id: &PilotId,
510    ) -> Result<(), GroupStoreError> {
511        let meta = self.meta.read().unwrap();
512        let record = meta
513            .get(group_id)
514            .ok_or_else(|| GroupStoreError::GroupNotFound(group_id.to_string()))?;
515        if record.group_type != GroupType::Public {
516            return Err(GroupStoreError::NotPublicGroup(group_id.to_string()));
517        }
518        drop(meta);
519        if !self.is_public_member(group_id, pilot_id) {
520            return Err(GroupStoreError::NotExistingMember(
521                pilot_id.to_string(),
522                group_id.to_string(),
523            ));
524        }
525        Ok(())
526    }
527
528    fn is_private_member(&self, group_id: &GroupId, pilot_id: &PilotId) -> bool {
529        self.private_members
530            .read()
531            .unwrap()
532            .get(group_id)
533            .map(|s| s.contains(pilot_id))
534            .unwrap_or(false)
535    }
536
537    fn is_public_member(&self, group_id: &GroupId, pilot_id: &PilotId) -> bool {
538        self.public_members
539            .read()
540            .unwrap()
541            .get(group_id)
542            .map(|s| s.contains(pilot_id))
543            .unwrap_or(false)
544    }
545
546    fn has_pending_invitation(&self, group_id: &GroupId, pilot_id: &PilotId) -> bool {
547        self.pending_invitations
548            .read()
549            .unwrap()
550            .get(pilot_id)
551            .map(|s| s.contains(group_id))
552            .unwrap_or(false)
553    }
554
555    // ── Directory paths ───────────────────────────────────────────────────────
556
557    fn creations_dir(&self) -> PathBuf { self.root.join(CREATIONS_DIRNAME) }
558    fn private_adds_dir(&self) -> PathBuf { self.root.join(PRIVATE_ADDS_DIRNAME) }
559    fn private_removes_dir(&self) -> PathBuf { self.root.join(PRIVATE_REMOVES_DIRNAME) }
560    fn public_invites_dir(&self) -> PathBuf { self.root.join(PUBLIC_INVITES_DIRNAME) }
561    fn public_accepts_dir(&self) -> PathBuf { self.root.join(PUBLIC_ACCEPTS_DIRNAME) }
562    fn public_leaves_dir(&self) -> PathBuf { self.root.join(PUBLIC_LEAVES_DIRNAME) }
563
564    // ── Generic file helpers ──────────────────────────────────────────────────
565
566    fn load_all_json<T: serde::de::DeserializeOwned>(
567        &self,
568        dir: PathBuf,
569    ) -> Result<Vec<T>, GroupStoreError> {
570        if !dir.exists() {
571            return Ok(Vec::new());
572        }
573        let mut records = Vec::new();
574        for entry in std::fs::read_dir(&dir)?.filter_map(Result::ok) {
575            let path = entry.path();
576            if path.extension().and_then(|e| e.to_str()) != Some("json") {
577                continue;
578            }
579            let bytes = std::fs::read(&path)?;
580            records.push(serde_json::from_slice(&bytes)?);
581        }
582        Ok(records)
583    }
584}
585
586fn write_json_file_atomic<T: serde::Serialize>(
587    path: &Path,
588    value: &T,
589) -> Result<(), GroupStoreError> {
590    write_json_file_atomic_impl(
591        path,
592        value,
593        |parent| {
594            std::fs::create_dir_all(parent)?;
595            Ok(())
596        },
597        |tmp_path, bytes| {
598            std::fs::write(tmp_path, bytes)?;
599            Ok(())
600        },
601        GroupStoreError::MissingParentDirectory,
602    )
603}