dukascopy_fx/storage/
checkpoint.rs1use crate::error::DukascopyError;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Mutex;
10
11pub trait CheckpointStore: Send + Sync {
13 fn get(&self, key: &str) -> Result<Option<DateTime<Utc>>, DukascopyError>;
15 fn set(&self, key: &str, timestamp: DateTime<Utc>) -> Result<(), DukascopyError>;
17 fn set_many(&self, updates: &[(String, DateTime<Utc>)]) -> Result<(), DukascopyError> {
19 for (key, timestamp) in updates {
20 self.set(key, timestamp.to_owned())?;
21 }
22 Ok(())
23 }
24}
25
26#[derive(Debug, Clone, Default, Serialize, Deserialize)]
27struct CheckpointState {
28 checkpoints: HashMap<String, DateTime<Utc>>,
29}
30
31#[derive(Debug)]
33pub struct FileCheckpointStore {
34 path: PathBuf,
35 state: Mutex<CheckpointState>,
36}
37
38impl FileCheckpointStore {
39 pub fn open(path: impl AsRef<Path>) -> Result<Self, DukascopyError> {
41 let path = path.as_ref().to_path_buf();
42 let state = if path.exists() {
43 let content = fs::read_to_string(&path).map_err(|err| {
44 DukascopyError::Unknown(format!(
45 "Failed to read checkpoint file '{}': {}",
46 path.display(),
47 err
48 ))
49 })?;
50 serde_json::from_str::<CheckpointState>(&content).map_err(|err| {
51 DukascopyError::InvalidRequest(format!(
52 "Invalid checkpoint file '{}': {}",
53 path.display(),
54 err
55 ))
56 })?
57 } else {
58 CheckpointState::default()
59 };
60
61 Ok(Self {
62 path,
63 state: Mutex::new(state),
64 })
65 }
66
67 fn persist(&self, state: &CheckpointState) -> Result<(), DukascopyError> {
68 if let Some(parent) = self.path.parent() {
69 fs::create_dir_all(parent).map_err(|err| {
70 DukascopyError::Unknown(format!(
71 "Failed to create checkpoint directory '{}': {}",
72 parent.display(),
73 err
74 ))
75 })?;
76 }
77
78 let temp_path = self.path.with_extension("tmp");
79 let content = serde_json::to_string_pretty(state).map_err(|err| {
80 DukascopyError::Unknown(format!("Failed to serialize checkpoint state: {}", err))
81 })?;
82
83 fs::write(&temp_path, content).map_err(|err| {
84 DukascopyError::Unknown(format!(
85 "Failed to write checkpoint temp file '{}': {}",
86 temp_path.display(),
87 err
88 ))
89 })?;
90
91 if let Err(err) = fs::rename(&temp_path, &self.path) {
92 if self.path.exists() {
95 fs::remove_file(&self.path).map_err(|remove_err| {
96 DukascopyError::Unknown(format!(
97 "Failed to remove existing checkpoint file '{}' after rename error '{}': {}",
98 self.path.display(),
99 err,
100 remove_err
101 ))
102 })?;
103
104 fs::rename(&temp_path, &self.path).map_err(|retry_err| {
105 DukascopyError::Unknown(format!(
106 "Failed to replace checkpoint file '{}' after retry: {}",
107 self.path.display(),
108 retry_err
109 ))
110 })?;
111 } else {
112 return Err(DukascopyError::Unknown(format!(
113 "Failed to replace checkpoint file '{}': {}",
114 self.path.display(),
115 err
116 )));
117 }
118 }
119
120 Ok(())
121 }
122}
123
124impl CheckpointStore for FileCheckpointStore {
125 fn get(&self, key: &str) -> Result<Option<DateTime<Utc>>, DukascopyError> {
126 let state = self
127 .state
128 .lock()
129 .map_err(|err| DukascopyError::Unknown(format!("Checkpoint lock poisoned: {}", err)))?;
130 Ok(state.checkpoints.get(key).cloned())
131 }
132
133 fn set(&self, key: &str, timestamp: DateTime<Utc>) -> Result<(), DukascopyError> {
134 self.set_many(&[(key.to_string(), timestamp)])
135 }
136
137 fn set_many(&self, updates: &[(String, DateTime<Utc>)]) -> Result<(), DukascopyError> {
138 let mut state = self
139 .state
140 .lock()
141 .map_err(|err| DukascopyError::Unknown(format!("Checkpoint lock poisoned: {}", err)))?;
142 for (key, timestamp) in updates {
143 state.checkpoints.insert(key.clone(), timestamp.to_owned());
144 }
145 self.persist(&state)
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use chrono::TimeZone;
153
154 #[test]
155 fn test_set_and_get_checkpoint() {
156 let unique = format!(
157 "dukascopy_fx_checkpoint_test_{}.json",
158 std::time::SystemTime::now()
159 .duration_since(std::time::UNIX_EPOCH)
160 .unwrap()
161 .as_nanos()
162 );
163 let path = std::env::temp_dir().join(unique);
164
165 let store = FileCheckpointStore::open(&path).unwrap();
166 let ts = Utc.with_ymd_and_hms(2025, 1, 3, 12, 0, 0).unwrap();
167 store.set("EURUSD:3600", ts).unwrap();
168
169 let loaded = FileCheckpointStore::open(&path).unwrap();
170 assert_eq!(loaded.get("EURUSD:3600").unwrap(), Some(ts));
171
172 let _ = fs::remove_file(path);
173 }
174
175 #[test]
176 fn test_set_many_persists_all_checkpoints() {
177 let unique = format!(
178 "dukascopy_fx_checkpoint_test_many_{}.json",
179 std::time::SystemTime::now()
180 .duration_since(std::time::UNIX_EPOCH)
181 .unwrap()
182 .as_nanos()
183 );
184 let path = std::env::temp_dir().join(unique);
185
186 let store = FileCheckpointStore::open(&path).unwrap();
187 let ts1 = Utc.with_ymd_and_hms(2025, 1, 3, 12, 0, 0).unwrap();
188 let ts2 = Utc.with_ymd_and_hms(2025, 1, 4, 12, 0, 0).unwrap();
189 let updates = vec![
190 ("EURUSD:3600".to_string(), ts1),
191 ("GBPUSD:3600".to_string(), ts2),
192 ];
193 store.set_many(&updates).unwrap();
194
195 let loaded = FileCheckpointStore::open(&path).unwrap();
196 assert_eq!(loaded.get("EURUSD:3600").unwrap(), Some(ts1));
197 assert_eq!(loaded.get("GBPUSD:3600").unwrap(), Some(ts2));
198
199 let _ = fs::remove_file(path);
200 }
201}