Skip to main content

dukascopy_fx/storage/
checkpoint.rs

1//! Checkpoint storage for incremental fetch flows.
2
3use crate::error::DukascopyError;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Mutex;
10
11/// Abstraction for persisting per-stream checkpoints.
12pub trait CheckpointStore: Send + Sync {
13    /// Reads checkpoint for a key.
14    fn get(&self, key: &str) -> Result<Option<DateTime<Utc>>, DukascopyError>;
15    /// Persists checkpoint for a key.
16    fn set(&self, key: &str, timestamp: DateTime<Utc>) -> Result<(), DukascopyError>;
17    /// Persists multiple checkpoints in one operation.
18    fn set_many(&self, updates: &[(String, DateTime<Utc>)]) -> Result<(), DukascopyError> {
19        for (key, timestamp) in updates {
20            self.set(key, timestamp.to_owned())?;
21        }
22        Ok(())
23    }
24}
25
26#[derive(Debug, Clone, Default, Serialize, Deserialize)]
27struct CheckpointState {
28    checkpoints: HashMap<String, DateTime<Utc>>,
29}
30
31/// JSON file-based checkpoint store.
32#[derive(Debug)]
33pub struct FileCheckpointStore {
34    path: PathBuf,
35    state: Mutex<CheckpointState>,
36}
37
38impl FileCheckpointStore {
39    /// Opens a file checkpoint store from a path. Creates an empty store when file does not exist.
40    pub fn open(path: impl AsRef<Path>) -> Result<Self, DukascopyError> {
41        let path = path.as_ref().to_path_buf();
42        let state = if path.exists() {
43            let content = fs::read_to_string(&path).map_err(|err| {
44                DukascopyError::Unknown(format!(
45                    "Failed to read checkpoint file '{}': {}",
46                    path.display(),
47                    err
48                ))
49            })?;
50            serde_json::from_str::<CheckpointState>(&content).map_err(|err| {
51                DukascopyError::InvalidRequest(format!(
52                    "Invalid checkpoint file '{}': {}",
53                    path.display(),
54                    err
55                ))
56            })?
57        } else {
58            CheckpointState::default()
59        };
60
61        Ok(Self {
62            path,
63            state: Mutex::new(state),
64        })
65    }
66
67    fn persist(&self, state: &CheckpointState) -> Result<(), DukascopyError> {
68        if let Some(parent) = self.path.parent() {
69            fs::create_dir_all(parent).map_err(|err| {
70                DukascopyError::Unknown(format!(
71                    "Failed to create checkpoint directory '{}': {}",
72                    parent.display(),
73                    err
74                ))
75            })?;
76        }
77
78        let temp_path = self.path.with_extension("tmp");
79        let content = serde_json::to_string_pretty(state).map_err(|err| {
80            DukascopyError::Unknown(format!("Failed to serialize checkpoint state: {}", err))
81        })?;
82
83        fs::write(&temp_path, content).map_err(|err| {
84            DukascopyError::Unknown(format!(
85                "Failed to write checkpoint temp file '{}': {}",
86                temp_path.display(),
87                err
88            ))
89        })?;
90
91        if let Err(err) = fs::rename(&temp_path, &self.path) {
92            // Some platforms/filesystems fail renaming over an existing destination.
93            // Retry by removing destination first.
94            if self.path.exists() {
95                fs::remove_file(&self.path).map_err(|remove_err| {
96                    DukascopyError::Unknown(format!(
97                        "Failed to remove existing checkpoint file '{}' after rename error '{}': {}",
98                        self.path.display(),
99                        err,
100                        remove_err
101                    ))
102                })?;
103
104                fs::rename(&temp_path, &self.path).map_err(|retry_err| {
105                    DukascopyError::Unknown(format!(
106                        "Failed to replace checkpoint file '{}' after retry: {}",
107                        self.path.display(),
108                        retry_err
109                    ))
110                })?;
111            } else {
112                return Err(DukascopyError::Unknown(format!(
113                    "Failed to replace checkpoint file '{}': {}",
114                    self.path.display(),
115                    err
116                )));
117            }
118        }
119
120        Ok(())
121    }
122}
123
124impl CheckpointStore for FileCheckpointStore {
125    fn get(&self, key: &str) -> Result<Option<DateTime<Utc>>, DukascopyError> {
126        let state = self
127            .state
128            .lock()
129            .map_err(|err| DukascopyError::Unknown(format!("Checkpoint lock poisoned: {}", err)))?;
130        Ok(state.checkpoints.get(key).cloned())
131    }
132
133    fn set(&self, key: &str, timestamp: DateTime<Utc>) -> Result<(), DukascopyError> {
134        self.set_many(&[(key.to_string(), timestamp)])
135    }
136
137    fn set_many(&self, updates: &[(String, DateTime<Utc>)]) -> Result<(), DukascopyError> {
138        let mut state = self
139            .state
140            .lock()
141            .map_err(|err| DukascopyError::Unknown(format!("Checkpoint lock poisoned: {}", err)))?;
142        for (key, timestamp) in updates {
143            state.checkpoints.insert(key.clone(), timestamp.to_owned());
144        }
145        self.persist(&state)
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use chrono::TimeZone;
153
154    #[test]
155    fn test_set_and_get_checkpoint() {
156        let unique = format!(
157            "dukascopy_fx_checkpoint_test_{}.json",
158            std::time::SystemTime::now()
159                .duration_since(std::time::UNIX_EPOCH)
160                .unwrap()
161                .as_nanos()
162        );
163        let path = std::env::temp_dir().join(unique);
164
165        let store = FileCheckpointStore::open(&path).unwrap();
166        let ts = Utc.with_ymd_and_hms(2025, 1, 3, 12, 0, 0).unwrap();
167        store.set("EURUSD:3600", ts).unwrap();
168
169        let loaded = FileCheckpointStore::open(&path).unwrap();
170        assert_eq!(loaded.get("EURUSD:3600").unwrap(), Some(ts));
171
172        let _ = fs::remove_file(path);
173    }
174
175    #[test]
176    fn test_set_many_persists_all_checkpoints() {
177        let unique = format!(
178            "dukascopy_fx_checkpoint_test_many_{}.json",
179            std::time::SystemTime::now()
180                .duration_since(std::time::UNIX_EPOCH)
181                .unwrap()
182                .as_nanos()
183        );
184        let path = std::env::temp_dir().join(unique);
185
186        let store = FileCheckpointStore::open(&path).unwrap();
187        let ts1 = Utc.with_ymd_and_hms(2025, 1, 3, 12, 0, 0).unwrap();
188        let ts2 = Utc.with_ymd_and_hms(2025, 1, 4, 12, 0, 0).unwrap();
189        let updates = vec![
190            ("EURUSD:3600".to_string(), ts1),
191            ("GBPUSD:3600".to_string(), ts2),
192        ];
193        store.set_many(&updates).unwrap();
194
195        let loaded = FileCheckpointStore::open(&path).unwrap();
196        assert_eq!(loaded.get("EURUSD:3600").unwrap(), Some(ts1));
197        assert_eq!(loaded.get("GBPUSD:3600").unwrap(), Some(ts2));
198
199        let _ = fs::remove_file(path);
200    }
201}