Skip to main content

things3_cloud/
log_cache.rs

1use std::{
2    fs::{self, File, OpenOptions},
3    io::{BufRead, BufReader, Seek, SeekFrom, Write},
4    path::{Path, PathBuf},
5};
6
7use anyhow::{Context, Result, anyhow};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::{
12    client::ThingsCloudClient,
13    store::{RawState, fold_item},
14    wire::wire_object::WireItem,
15};
16
17#[derive(Debug, Clone, Default)]
18struct SyncSnapshot {
19    history_key: Option<String>,
20    head_index: i64,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24struct CursorData {
25    next_start_index: i64,
26    history_key: String,
27    #[serde(default)]
28    head_index: i64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default)]
32struct StateCacheData {
33    #[serde(default)]
34    version: u8,
35    log_offset: u64,
36    state: RawState,
37}
38
39const STATE_CACHE_VERSION: u8 = 2;
40
41fn read_cursor(path: &Path) -> CursorData {
42    if !path.exists() {
43        return CursorData::default();
44    }
45    let Ok(raw) = fs::read_to_string(path) else {
46        return CursorData::default();
47    };
48    serde_json::from_str(&raw).unwrap_or_default()
49}
50
51fn write_cursor(
52    path: &Path,
53    next_start_index: i64,
54    history_key: &str,
55    head_index: i64,
56) -> Result<()> {
57    let payload = serde_json::to_string(&serde_json::json!({
58        "next_start_index": next_start_index,
59        "history_key": history_key,
60        "head_index": head_index,
61        "updated_at": crate::client::now_timestamp(),
62    }))?;
63    let tmp = path.with_extension("tmp");
64    fs::write(&tmp, payload)?;
65    fs::rename(tmp, path)?;
66    Ok(())
67}
68
69pub fn sync_append_log(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
70    fs::create_dir_all(cache_dir)?;
71    let log_path = cache_dir.join("things.log");
72    let cursor_path = cache_dir.join("cursor.json");
73
74    let cursor = read_cursor(&cursor_path);
75    let mut start_index = cursor.next_start_index;
76
77    if client.history_key.is_none() {
78        if !cursor.history_key.is_empty() {
79            client.history_key = Some(cursor.history_key.clone());
80        } else {
81            let _ = client.authenticate()?;
82        }
83    }
84
85    let mut fp = OpenOptions::new()
86        .create(true)
87        .append(true)
88        .open(&log_path)
89        .with_context(|| format!("failed to open {}", log_path.display()))?;
90
91    loop {
92        let page = match client.get_items_page(start_index) {
93            Ok(v) => v,
94            Err(_) => {
95                let _ = client.authenticate()?;
96                client.get_items_page(start_index)?
97            }
98        };
99
100        let items = page
101            .get("items")
102            .and_then(Value::as_array)
103            .cloned()
104            .unwrap_or_default();
105        let end = page
106            .get("end-total-content-size")
107            .and_then(Value::as_i64)
108            .unwrap_or(0);
109        let latest = page
110            .get("latest-total-content-size")
111            .and_then(Value::as_i64)
112            .unwrap_or(0);
113        client.head_index = page
114            .get("current-item-index")
115            .and_then(Value::as_i64)
116            .unwrap_or(client.head_index);
117
118        for item in &items {
119            writeln!(fp, "{}", serde_json::to_string(item)?)?;
120        }
121
122        if !items.is_empty() {
123            fp.flush()?;
124            start_index += items.len() as i64;
125            write_cursor(
126                &cursor_path,
127                start_index,
128                client.history_key.as_deref().unwrap_or_default(),
129                client.head_index,
130            )?;
131        }
132
133        if items.is_empty() || end >= latest {
134            break;
135        }
136    }
137
138    let current_history_key = client.history_key.clone().unwrap_or_default();
139    if current_history_key != cursor.history_key || client.head_index != cursor.head_index {
140        write_cursor(
141            &cursor_path,
142            start_index,
143            &current_history_key,
144            client.head_index,
145        )?;
146    }
147
148    Ok(())
149}
150
151fn read_state_cache(cache_dir: &Path) -> (RawState, u64) {
152    let path = cache_dir.join("state_cache.json");
153    if !path.exists() {
154        return (RawState::new(), 0);
155    }
156    let Ok(raw) = fs::read_to_string(&path) else {
157        return (RawState::new(), 0);
158    };
159    let Ok(cache) = serde_json::from_str::<StateCacheData>(&raw) else {
160        return (RawState::new(), 0);
161    };
162
163    if cache.version != STATE_CACHE_VERSION {
164        return (RawState::new(), 0);
165    }
166
167    (cache.state, cache.log_offset)
168}
169
170fn write_state_cache(cache_dir: &Path, state: &RawState, log_offset: u64) -> Result<()> {
171    let path = cache_dir.join("state_cache.json");
172    let payload = serde_json::to_string(&StateCacheData {
173        version: STATE_CACHE_VERSION,
174        log_offset,
175        state: state.clone(),
176    })?;
177    let tmp = path.with_extension("tmp");
178    fs::write(&tmp, payload)?;
179    fs::rename(tmp, path)?;
180    Ok(())
181}
182
183pub fn fold_state_from_append_log(cache_dir: &Path) -> Result<RawState> {
184    let log_path = cache_dir.join("things.log");
185    if !log_path.exists() {
186        return Ok(RawState::new());
187    }
188
189    let (mut state, byte_offset) = read_state_cache(cache_dir);
190    let mut new_lines = 0u64;
191
192    let mut file =
193        File::open(&log_path).with_context(|| format!("failed to open {}", log_path.display()))?;
194    file.seek(SeekFrom::Start(byte_offset))?;
195    let mut reader = BufReader::new(file);
196    let mut line = String::new();
197    let mut safe_offset = byte_offset;
198
199    loop {
200        line.clear();
201        let read = reader.read_line(&mut line)?;
202        if read == 0 {
203            break;
204        }
205
206        if !line.ends_with('\n') {
207            break;
208        }
209
210        let stripped = line.trim();
211        if stripped.is_empty() {
212            safe_offset = reader.stream_position()?;
213            continue;
214        }
215        let item: WireItem = serde_json::from_str(stripped)
216            .with_context(|| format!("Corrupt log entry at {}", log_path.display()))?;
217        fold_item(item, &mut state);
218        new_lines += 1;
219        safe_offset = reader.stream_position()?;
220    }
221
222    if new_lines > 0 {
223        write_state_cache(cache_dir, &state, safe_offset)?;
224    }
225
226    Ok(state)
227}
228
229pub fn get_state_with_append_log(
230    client: &mut ThingsCloudClient,
231    cache_dir: PathBuf,
232) -> Result<RawState> {
233    let mut sync_client = client.clone();
234    let sync_cache_dir = cache_dir.clone();
235
236    let sync_worker = std::thread::spawn(move || -> Result<SyncSnapshot> {
237        sync_append_log(&mut sync_client, &sync_cache_dir)?;
238        Ok(SyncSnapshot {
239            history_key: sync_client.history_key,
240            head_index: sync_client.head_index,
241        })
242    });
243
244    let _stale_state = fold_state_from_append_log(&cache_dir)?;
245
246    let sync_snapshot = sync_worker
247        .join()
248        .map_err(|_| anyhow!("sync worker panicked"))??;
249
250    client.history_key = sync_snapshot.history_key;
251    client.head_index = sync_snapshot.head_index;
252
253    fold_state_from_append_log(&cache_dir)
254}
255
256pub fn fold_state_from_append_log_or_empty(cache_dir: &Path) -> RawState {
257    fold_state_from_append_log(cache_dir).unwrap_or_default()
258}
259
260pub fn read_cached_head_index(cache_dir: &Path) -> i64 {
261    read_cursor(&cache_dir.join("cursor.json")).head_index
262}
263
264pub fn sync_append_log_or_err(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
265    sync_append_log(client, cache_dir).map_err(|e| anyhow!(e.to_string()))
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn fold_state_ignores_trailing_partial_line() {
274        let temp_dir = tempfile::tempdir().expect("tempdir");
275        let cache_dir = temp_dir.path();
276        let log_path = cache_dir.join("things.log");
277
278        let line_one = r#"{"3C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00A":{"t":0,"e":"Settings5","p":{}}}"#;
279        let line_two = r#"{"4C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00B":{"t":0,"e":"Settings5","p":{}}}"#;
280        let split_at = line_two.len() / 2;
281
282        fs::write(
283            &log_path,
284            format!("{}\n{}", line_one, &line_two[..split_at]),
285        )
286        .expect("seed log");
287
288        let first_state = fold_state_from_append_log(cache_dir).expect("first fold");
289        assert_eq!(first_state.len(), 1);
290
291        let (_, first_offset) = read_state_cache(cache_dir);
292        assert_eq!(first_offset, (line_one.len() + 1) as u64);
293
294        let mut fp = OpenOptions::new()
295            .append(true)
296            .open(&log_path)
297            .expect("open log for append");
298        writeln!(fp, "{}", &line_two[split_at..]).expect("append line remainder");
299
300        let second_state = fold_state_from_append_log(cache_dir).expect("second fold");
301        assert_eq!(second_state.len(), 2);
302
303        let expected_offset = fs::metadata(&log_path).expect("log metadata").len();
304        let (_, second_offset) = read_state_cache(cache_dir);
305        assert_eq!(second_offset, expected_offset);
306    }
307}