jj_lib/
simple_op_heads_store.rs1#![expect(missing_docs)]
16
17use std::fmt::Debug;
18use std::fmt::Formatter;
19use std::fs;
20use std::io;
21use std::path::Path;
22use std::path::PathBuf;
23
24use async_trait::async_trait;
25use thiserror::Error;
26
27use crate::backend::BackendInitError;
28use crate::file_util::IoResultExt as _;
29use crate::file_util::PathError;
30use crate::hex_util;
31use crate::lock::FileLock;
32use crate::object_id::ObjectId as _;
33use crate::op_heads_store::OpHeadsStore;
34use crate::op_heads_store::OpHeadsStoreError;
35use crate::op_heads_store::OpHeadsStoreLock;
36use crate::op_store::OperationId;
37
38#[derive(Debug, Error)]
40#[error("Failed to initialize simple operation heads store")]
41pub struct SimpleOpHeadsStoreInitError(#[from] pub PathError);
42
43impl From<SimpleOpHeadsStoreInitError> for BackendInitError {
44 fn from(err: SimpleOpHeadsStoreInitError) -> Self {
45 Self(err.into())
46 }
47}
48
49pub struct SimpleOpHeadsStore {
50 dir: PathBuf,
51}
52
53impl Debug for SimpleOpHeadsStore {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("SimpleOpHeadsStore")
56 .field("dir", &self.dir)
57 .finish()
58 }
59}
60
61impl SimpleOpHeadsStore {
62 pub fn name() -> &'static str {
63 "simple_op_heads_store"
64 }
65
66 pub fn init(dir: &Path, root_op_id: &OperationId) -> Result<Self, SimpleOpHeadsStoreInitError> {
67 let op_heads_dir = dir.join("heads");
68 fs::create_dir(&op_heads_dir).context(&op_heads_dir)?;
69 let store = Self { dir: op_heads_dir };
70 store.add_op_head(root_op_id)?;
71 Ok(store)
72 }
73
74 pub fn load(dir: &Path) -> Self {
75 let op_heads_dir = dir.join("heads");
76 Self { dir: op_heads_dir }
77 }
78
79 fn add_op_head(&self, id: &OperationId) -> Result<(), PathError> {
80 let path = self.dir.join(id.hex());
81 std::fs::write(&path, "").context(path)
82 }
83
84 fn remove_op_head(&self, id: &OperationId) -> Result<(), PathError> {
85 let path = self.dir.join(id.hex());
86 std::fs::remove_file(&path)
87 .or_else(|err| {
88 if err.kind() == io::ErrorKind::NotFound {
89 Ok(())
94 } else {
95 Err(err)
96 }
97 })
98 .context(path)
99 }
100}
101
102struct SimpleOpHeadsStoreLock {
103 _lock: FileLock,
104}
105
106impl OpHeadsStoreLock for SimpleOpHeadsStoreLock {}
107
108#[async_trait]
109impl OpHeadsStore for SimpleOpHeadsStore {
110 fn name(&self) -> &str {
111 Self::name()
112 }
113
114 async fn update_op_heads(
115 &self,
116 old_ids: &[OperationId],
117 new_id: &OperationId,
118 ) -> Result<(), OpHeadsStoreError> {
119 self.add_op_head(new_id)
120 .map_err(|err| OpHeadsStoreError::Write {
121 new_op_id: new_id.clone(),
122 source: err.into(),
123 })?;
124 for old_id in old_ids {
125 if old_id == new_id {
126 continue;
127 }
128 self.remove_op_head(old_id)
129 .map_err(|err| OpHeadsStoreError::Write {
130 new_op_id: new_id.clone(),
131 source: err.into(),
132 })?;
133 }
134 Ok(())
135 }
136
137 async fn get_op_heads(&self) -> Result<Vec<OperationId>, OpHeadsStoreError> {
138 let mut op_heads = vec![];
139 for op_head_entry in
140 std::fs::read_dir(&self.dir).map_err(|err| OpHeadsStoreError::Read(err.into()))?
141 {
142 let op_head_file_name = op_head_entry
143 .map_err(|err| OpHeadsStoreError::Read(err.into()))?
144 .file_name();
145 let op_head_file_name = op_head_file_name.to_str().ok_or_else(|| {
146 OpHeadsStoreError::Read(
147 format!("Non-utf8 in op head file name: {op_head_file_name:?}").into(),
148 )
149 })?;
150 if let Some(op_head) = hex_util::decode_hex(op_head_file_name) {
151 op_heads.push(OperationId::new(op_head));
152 }
153 }
154 op_heads.sort();
155 if op_heads.is_empty() {
156 Err(OpHeadsStoreError::Read(
157 "Corrupt repository: no head operation".into(),
158 ))
159 } else {
160 Ok(op_heads)
161 }
162 }
163
164 async fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError> {
165 let lock = FileLock::lock(self.dir.join("lock"))
166 .map_err(|err| OpHeadsStoreError::Lock(err.into()))?;
167 Ok(Box::new(SimpleOpHeadsStoreLock { _lock: lock }))
168 }
169}
170
171#[cfg(test)]
172mod tests {
173
174 use std::slice;
175
176 use pollster::FutureExt as _;
177
178 use super::*;
179 use crate::tests::TestResult;
180
181 #[test]
182 fn test_op_heads() -> TestResult {
183 let dir = tempfile::tempdir()?;
184
185 let op1 = OperationId::from_hex("1111");
186 let op2 = OperationId::from_hex("2222");
187 let op3 = OperationId::from_hex("3333");
188 let op4 = OperationId::from_hex("4444");
189
190 let op_heads_store = SimpleOpHeadsStore::init(dir.path(), &op1)?;
192 let op_heads = op_heads_store.get_op_heads().block_on()?;
193 assert_eq!(op_heads, vec![op1.clone()]);
194
195 op_heads_store
197 .update_op_heads(slice::from_ref(&op1), &op2)
198 .block_on()?;
199 let op_heads = op_heads_store.get_op_heads().block_on()?;
200 assert_eq!(op_heads, vec![op2.clone()]);
201
202 op_heads_store.update_op_heads(&[], &op2).block_on()?;
204 let op_heads = op_heads_store.get_op_heads().block_on()?;
205 assert_eq!(op_heads, vec![op2.clone()]);
206
207 op_heads_store
209 .update_op_heads(slice::from_ref(&op1), &op2)
210 .block_on()?;
211 let op_heads = op_heads_store.get_op_heads().block_on()?;
212 assert_eq!(op_heads, vec![op2.clone()]);
213
214 op_heads_store.update_op_heads(&[], &op3).block_on()?;
216 let op_heads = op_heads_store.get_op_heads().block_on()?;
217 assert_eq!(op_heads, vec![op2.clone(), op3.clone()]);
218
219 op_heads_store
221 .update_op_heads(&[op2.clone(), op3.clone()], &op4)
222 .block_on()?;
223 let op_heads = op_heads_store.get_op_heads().block_on()?;
224 assert_eq!(op_heads, vec![op4.clone()]);
225
226 op_heads_store.update_op_heads(&[], &op3).block_on()?;
228 op_heads_store
229 .update_op_heads(&[op3.clone(), op4.clone()], &op4)
230 .block_on()?;
231 let op_heads = op_heads_store.get_op_heads().block_on()?;
232 assert_eq!(op_heads, vec![op4.clone()]);
233
234 Ok(())
235 }
236}