1use std::collections::{HashMap, HashSet};
6use std::path::{Path, PathBuf};
7use std::sync::RwLock;
8
9use crate::follow::{FollowRecord, FollowRecordError, UnfollowRecord};
10use crate::id::PilotId;
11use crate::util::write_json_file_atomic as write_json_file_atomic_impl;
12
13const FOLLOWS_DIRNAME: &str = "follows";
14const FOLLOW_RECORDS_DIRNAME: &str = "follow-records";
15const UNFOLLOW_RECORDS_DIRNAME: &str = "unfollow-records";
16
17#[derive(Debug, thiserror::Error)]
20pub enum FollowStoreError {
21 #[error("I/O: {0}")]
22 Io(#[from] std::io::Error),
23 #[error("JSON: {0}")]
24 Json(#[from] serde_json::Error),
25 #[error("follow record: {0}")]
26 Record(#[from] FollowRecordError),
27 #[error("pilot {0} is already following {1}")]
28 AlreadyFollowing(String, String),
29 #[error("pilot {0} is not following {1}")]
30 NotFollowing(String, String),
31 #[error("missing parent directory")]
32 MissingParentDirectory,
33}
34
35pub struct FollowStore {
38 root: PathBuf,
39 following: RwLock<HashMap<PilotId, HashSet<PilotId>>>,
41 followers: RwLock<HashMap<PilotId, HashSet<PilotId>>>,
43}
44
45impl FollowStore {
46 pub fn open(root: impl Into<PathBuf>) -> Self {
47 Self {
48 root: root.into(),
49 following: RwLock::new(HashMap::new()),
50 followers: RwLock::new(HashMap::new()),
51 }
52 }
53
54 pub fn for_data_dir(data_dir: impl AsRef<Path>) -> Self {
55 Self::open(data_dir.as_ref().join(FOLLOWS_DIRNAME))
56 }
57
58 fn init_dirs(&self) -> Result<(), FollowStoreError> {
59 std::fs::create_dir_all(self.follow_records_dir())?;
60 std::fs::create_dir_all(self.unfollow_records_dir())?;
61 Ok(())
62 }
63
64 pub fn init(&self) -> Result<(), FollowStoreError> {
66 self.init_dirs()?;
67 let mut follows = self.load_all_json::<FollowRecord>(self.follow_records_dir())?;
68 follows.sort_by(|a, b| a.created_at.cmp(&b.created_at));
69 for r in follows {
70 r.validate()?;
71 self.apply_follow(&r);
72 }
73 let mut unfollow = self.load_all_json::<UnfollowRecord>(self.unfollow_records_dir())?;
74 unfollow.sort_by(|a, b| a.created_at.cmp(&b.created_at));
75 for r in unfollow {
76 r.validate()?;
77 self.apply_unfollow(&r);
78 }
79 Ok(())
80 }
81
82 pub fn follow_pilot(&self, record: FollowRecord) -> Result<(), FollowStoreError> {
85 self.init_dirs()?;
86 record.validate()?;
87 if self.is_following(&record.follower_pilot_id, &record.followee_pilot_id) {
88 return Err(FollowStoreError::AlreadyFollowing(
89 record.follower_pilot_id.to_string(),
90 record.followee_pilot_id.to_string(),
91 ));
92 }
93 let path = self.follow_records_dir().join(format!("{}.json", record.record_id));
94 if !path.exists() {
95 write_json_file_atomic(&path, &record)?;
96 self.apply_follow(&record);
97 }
98 Ok(())
99 }
100
101 pub fn unfollow_pilot(&self, record: UnfollowRecord) -> Result<(), FollowStoreError> {
102 self.init_dirs()?;
103 record.validate()?;
104 if !self.is_following(&record.follower_pilot_id, &record.followee_pilot_id) {
105 return Err(FollowStoreError::NotFollowing(
106 record.follower_pilot_id.to_string(),
107 record.followee_pilot_id.to_string(),
108 ));
109 }
110 let path = self.unfollow_records_dir().join(format!("{}.json", record.record_id));
111 if !path.exists() {
112 write_json_file_atomic(&path, &record)?;
113 self.apply_unfollow(&record);
114 }
115 Ok(())
116 }
117
118 pub fn list_following(&self, pilot_id: &PilotId) -> Vec<PilotId> {
121 self.following
122 .read()
123 .unwrap()
124 .get(pilot_id)
125 .map(|s| s.iter().cloned().collect())
126 .unwrap_or_default()
127 }
128
129 pub fn list_followers(&self, pilot_id: &PilotId) -> Vec<PilotId> {
130 self.followers
131 .read()
132 .unwrap()
133 .get(pilot_id)
134 .map(|s| s.iter().cloned().collect())
135 .unwrap_or_default()
136 }
137
138 pub fn is_following(&self, follower: &PilotId, followee: &PilotId) -> bool {
139 self.following
140 .read()
141 .unwrap()
142 .get(follower)
143 .map(|s| s.contains(followee))
144 .unwrap_or(false)
145 }
146
147 fn apply_follow(&self, record: &FollowRecord) {
150 self.following
151 .write()
152 .unwrap()
153 .entry(record.follower_pilot_id.clone())
154 .or_default()
155 .insert(record.followee_pilot_id.clone());
156 self.followers
157 .write()
158 .unwrap()
159 .entry(record.followee_pilot_id.clone())
160 .or_default()
161 .insert(record.follower_pilot_id.clone());
162 }
163
164 fn apply_unfollow(&self, record: &UnfollowRecord) {
165 if let Some(set) = self.following.write().unwrap().get_mut(&record.follower_pilot_id) {
166 set.remove(&record.followee_pilot_id);
167 }
168 if let Some(set) = self.followers.write().unwrap().get_mut(&record.followee_pilot_id) {
169 set.remove(&record.follower_pilot_id);
170 }
171 }
172
173 fn follow_records_dir(&self) -> PathBuf { self.root.join(FOLLOW_RECORDS_DIRNAME) }
176 fn unfollow_records_dir(&self) -> PathBuf { self.root.join(UNFOLLOW_RECORDS_DIRNAME) }
177
178 fn load_all_json<T: serde::de::DeserializeOwned>(
179 &self,
180 dir: PathBuf,
181 ) -> Result<Vec<T>, FollowStoreError> {
182 if !dir.exists() {
183 return Ok(Vec::new());
184 }
185 let mut records = Vec::new();
186 for entry in std::fs::read_dir(&dir)?.filter_map(Result::ok) {
187 let path = entry.path();
188 if path.extension().and_then(|e| e.to_str()) != Some("json") {
189 continue;
190 }
191 records.push(serde_json::from_slice(&std::fs::read(&path)?)?);
192 }
193 Ok(records)
194 }
195}
196
197fn write_json_file_atomic<T: serde::Serialize>(
198 path: &Path,
199 value: &T,
200) -> Result<(), FollowStoreError> {
201 write_json_file_atomic_impl(
202 path,
203 value,
204 |parent| {
205 std::fs::create_dir_all(parent)?;
206 Ok(())
207 },
208 |tmp_path, bytes| {
209 std::fs::write(tmp_path, bytes)?;
210 Ok(())
211 },
212 FollowStoreError::MissingParentDirectory,
213 )
214}