Skip to main content

igc_net/
follow_store.rs

1//! Persistent follow/unfollow store with in-memory caches.
2//!
3//! Implements the follow storage layout described in `specs/70-groups-and-social.md §8`.
4
5use std::collections::{HashMap, HashSet};
6use std::path::{Path, PathBuf};
7use std::sync::RwLock;
8
9use crate::follow::{FollowRecord, FollowRecordError, UnfollowRecord};
10use crate::id::PilotId;
11use crate::util::write_json_file_atomic as write_json_file_atomic_impl;
12
13const FOLLOWS_DIRNAME: &str = "follows";
14const FOLLOW_RECORDS_DIRNAME: &str = "follow-records";
15const UNFOLLOW_RECORDS_DIRNAME: &str = "unfollow-records";
16
17// ── Error ─────────────────────────────────────────────────────────────────────
18
19#[derive(Debug, thiserror::Error)]
20pub enum FollowStoreError {
21    #[error("I/O: {0}")]
22    Io(#[from] std::io::Error),
23    #[error("JSON: {0}")]
24    Json(#[from] serde_json::Error),
25    #[error("follow record: {0}")]
26    Record(#[from] FollowRecordError),
27    #[error("pilot {0} is already following {1}")]
28    AlreadyFollowing(String, String),
29    #[error("pilot {0} is not following {1}")]
30    NotFollowing(String, String),
31    #[error("missing parent directory")]
32    MissingParentDirectory,
33}
34
35// ── FollowStore ───────────────────────────────────────────────────────────────
36
37pub struct FollowStore {
38    root: PathBuf,
39    /// follower → set of followees
40    following: RwLock<HashMap<PilotId, HashSet<PilotId>>>,
41    /// followee → set of followers
42    followers: RwLock<HashMap<PilotId, HashSet<PilotId>>>,
43}
44
45impl FollowStore {
46    pub fn open(root: impl Into<PathBuf>) -> Self {
47        Self {
48            root: root.into(),
49            following: RwLock::new(HashMap::new()),
50            followers: RwLock::new(HashMap::new()),
51        }
52    }
53
54    pub fn for_data_dir(data_dir: impl AsRef<Path>) -> Self {
55        Self::open(data_dir.as_ref().join(FOLLOWS_DIRNAME))
56    }
57
58    fn init_dirs(&self) -> Result<(), FollowStoreError> {
59        std::fs::create_dir_all(self.follow_records_dir())?;
60        std::fs::create_dir_all(self.unfollow_records_dir())?;
61        Ok(())
62    }
63
64    /// Load all persisted records into the in-memory cache.  Call once at startup.
65    pub fn init(&self) -> Result<(), FollowStoreError> {
66        self.init_dirs()?;
67        let mut follows = self.load_all_json::<FollowRecord>(self.follow_records_dir())?;
68        follows.sort_by(|a, b| a.created_at.cmp(&b.created_at));
69        for r in follows {
70            r.validate()?;
71            self.apply_follow(&r);
72        }
73        let mut unfollow = self.load_all_json::<UnfollowRecord>(self.unfollow_records_dir())?;
74        unfollow.sort_by(|a, b| a.created_at.cmp(&b.created_at));
75        for r in unfollow {
76            r.validate()?;
77            self.apply_unfollow(&r);
78        }
79        Ok(())
80    }
81
82    // ── Write operations ──────────────────────────────────────────────────────
83
84    pub fn follow_pilot(&self, record: FollowRecord) -> Result<(), FollowStoreError> {
85        self.init_dirs()?;
86        record.validate()?;
87        if self.is_following(&record.follower_pilot_id, &record.followee_pilot_id) {
88            return Err(FollowStoreError::AlreadyFollowing(
89                record.follower_pilot_id.to_string(),
90                record.followee_pilot_id.to_string(),
91            ));
92        }
93        let path = self.follow_records_dir().join(format!("{}.json", record.record_id));
94        if !path.exists() {
95            write_json_file_atomic(&path, &record)?;
96            self.apply_follow(&record);
97        }
98        Ok(())
99    }
100
101    pub fn unfollow_pilot(&self, record: UnfollowRecord) -> Result<(), FollowStoreError> {
102        self.init_dirs()?;
103        record.validate()?;
104        if !self.is_following(&record.follower_pilot_id, &record.followee_pilot_id) {
105            return Err(FollowStoreError::NotFollowing(
106                record.follower_pilot_id.to_string(),
107                record.followee_pilot_id.to_string(),
108            ));
109        }
110        let path = self.unfollow_records_dir().join(format!("{}.json", record.record_id));
111        if !path.exists() {
112            write_json_file_atomic(&path, &record)?;
113            self.apply_unfollow(&record);
114        }
115        Ok(())
116    }
117
118    // ── Query operations ──────────────────────────────────────────────────────
119
120    pub fn list_following(&self, pilot_id: &PilotId) -> Vec<PilotId> {
121        self.following
122            .read()
123            .unwrap()
124            .get(pilot_id)
125            .map(|s| s.iter().cloned().collect())
126            .unwrap_or_default()
127    }
128
129    pub fn list_followers(&self, pilot_id: &PilotId) -> Vec<PilotId> {
130        self.followers
131            .read()
132            .unwrap()
133            .get(pilot_id)
134            .map(|s| s.iter().cloned().collect())
135            .unwrap_or_default()
136    }
137
138    pub fn is_following(&self, follower: &PilotId, followee: &PilotId) -> bool {
139        self.following
140            .read()
141            .unwrap()
142            .get(follower)
143            .map(|s| s.contains(followee))
144            .unwrap_or(false)
145    }
146
147    // ── Private apply helpers ─────────────────────────────────────────────────
148
149    fn apply_follow(&self, record: &FollowRecord) {
150        self.following
151            .write()
152            .unwrap()
153            .entry(record.follower_pilot_id.clone())
154            .or_default()
155            .insert(record.followee_pilot_id.clone());
156        self.followers
157            .write()
158            .unwrap()
159            .entry(record.followee_pilot_id.clone())
160            .or_default()
161            .insert(record.follower_pilot_id.clone());
162    }
163
164    fn apply_unfollow(&self, record: &UnfollowRecord) {
165        if let Some(set) = self.following.write().unwrap().get_mut(&record.follower_pilot_id) {
166            set.remove(&record.followee_pilot_id);
167        }
168        if let Some(set) = self.followers.write().unwrap().get_mut(&record.followee_pilot_id) {
169            set.remove(&record.follower_pilot_id);
170        }
171    }
172
173    // ── Directory paths ───────────────────────────────────────────────────────
174
175    fn follow_records_dir(&self) -> PathBuf { self.root.join(FOLLOW_RECORDS_DIRNAME) }
176    fn unfollow_records_dir(&self) -> PathBuf { self.root.join(UNFOLLOW_RECORDS_DIRNAME) }
177
178    fn load_all_json<T: serde::de::DeserializeOwned>(
179        &self,
180        dir: PathBuf,
181    ) -> Result<Vec<T>, FollowStoreError> {
182        if !dir.exists() {
183            return Ok(Vec::new());
184        }
185        let mut records = Vec::new();
186        for entry in std::fs::read_dir(&dir)?.filter_map(Result::ok) {
187            let path = entry.path();
188            if path.extension().and_then(|e| e.to_str()) != Some("json") {
189                continue;
190            }
191            records.push(serde_json::from_slice(&std::fs::read(&path)?)?);
192        }
193        Ok(records)
194    }
195}
196
197fn write_json_file_atomic<T: serde::Serialize>(
198    path: &Path,
199    value: &T,
200) -> Result<(), FollowStoreError> {
201    write_json_file_atomic_impl(
202        path,
203        value,
204        |parent| {
205            std::fs::create_dir_all(parent)?;
206            Ok(())
207        },
208        |tmp_path, bytes| {
209            std::fs::write(tmp_path, bytes)?;
210            Ok(())
211        },
212        FollowStoreError::MissingParentDirectory,
213    )
214}