Skip to main content

things3_cloud/
log_cache.rs

1use crate::client::ThingsCloudClient;
2use crate::store::{fold_item, RawState};
3use crate::wire::wire_object::WireItem;
4use anyhow::{anyhow, Context, Result};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11#[derive(Debug, Clone, Default)]
12struct SyncSnapshot {
13    history_key: Option<String>,
14    head_index: i64,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18struct CursorData {
19    next_start_index: i64,
20    history_key: String,
21    #[serde(default)]
22    head_index: i64,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, Default)]
26struct StateCacheData {
27    #[serde(default)]
28    version: u8,
29    log_offset: u64,
30    state: RawState,
31}
32
33const STATE_CACHE_VERSION: u8 = 2;
34
35fn read_cursor(path: &Path) -> CursorData {
36    if !path.exists() {
37        return CursorData::default();
38    }
39    let Ok(raw) = fs::read_to_string(path) else {
40        return CursorData::default();
41    };
42    serde_json::from_str(&raw).unwrap_or_default()
43}
44
45fn write_cursor(
46    path: &Path,
47    next_start_index: i64,
48    history_key: &str,
49    head_index: i64,
50) -> Result<()> {
51    let payload = serde_json::to_string(&serde_json::json!({
52        "next_start_index": next_start_index,
53        "history_key": history_key,
54        "head_index": head_index,
55        "updated_at": crate::client::now_timestamp(),
56    }))?;
57    let tmp = path.with_extension("tmp");
58    fs::write(&tmp, payload)?;
59    fs::rename(tmp, path)?;
60    Ok(())
61}
62
63pub fn sync_append_log(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
64    fs::create_dir_all(cache_dir)?;
65    let log_path = cache_dir.join("things.log");
66    let cursor_path = cache_dir.join("cursor.json");
67
68    let cursor = read_cursor(&cursor_path);
69    let mut start_index = cursor.next_start_index;
70
71    if client.history_key.is_none() {
72        if !cursor.history_key.is_empty() {
73            client.history_key = Some(cursor.history_key.clone());
74        } else {
75            let _ = client.authenticate()?;
76        }
77    }
78
79    let mut fp = OpenOptions::new()
80        .create(true)
81        .append(true)
82        .open(&log_path)
83        .with_context(|| format!("failed to open {}", log_path.display()))?;
84
85    loop {
86        let page = match client.get_items_page(start_index) {
87            Ok(v) => v,
88            Err(_) => {
89                let _ = client.authenticate()?;
90                client.get_items_page(start_index)?
91            }
92        };
93
94        let items = page
95            .get("items")
96            .and_then(Value::as_array)
97            .cloned()
98            .unwrap_or_default();
99        let end = page
100            .get("end-total-content-size")
101            .and_then(Value::as_i64)
102            .unwrap_or(0);
103        let latest = page
104            .get("latest-total-content-size")
105            .and_then(Value::as_i64)
106            .unwrap_or(0);
107        client.head_index = page
108            .get("current-item-index")
109            .and_then(Value::as_i64)
110            .unwrap_or(client.head_index);
111
112        for item in &items {
113            writeln!(fp, "{}", serde_json::to_string(item)?)?;
114        }
115
116        if !items.is_empty() {
117            fp.flush()?;
118            start_index += items.len() as i64;
119            write_cursor(
120                &cursor_path,
121                start_index,
122                client.history_key.as_deref().unwrap_or_default(),
123                client.head_index,
124            )?;
125        }
126
127        if items.is_empty() || end >= latest {
128            break;
129        }
130    }
131
132    let current_history_key = client.history_key.clone().unwrap_or_default();
133    if current_history_key != cursor.history_key || client.head_index != cursor.head_index {
134        write_cursor(
135            &cursor_path,
136            start_index,
137            &current_history_key,
138            client.head_index,
139        )?;
140    }
141
142    Ok(())
143}
144
145fn read_state_cache(cache_dir: &Path) -> (RawState, u64) {
146    let path = cache_dir.join("state_cache.json");
147    if !path.exists() {
148        return (RawState::new(), 0);
149    }
150    let Ok(raw) = fs::read_to_string(&path) else {
151        return (RawState::new(), 0);
152    };
153    let Ok(cache) = serde_json::from_str::<StateCacheData>(&raw) else {
154        return (RawState::new(), 0);
155    };
156
157    if cache.version != STATE_CACHE_VERSION {
158        return (RawState::new(), 0);
159    }
160
161    (cache.state, cache.log_offset)
162}
163
164fn write_state_cache(cache_dir: &Path, state: &RawState, log_offset: u64) -> Result<()> {
165    let path = cache_dir.join("state_cache.json");
166    let payload = serde_json::to_string(&StateCacheData {
167        version: STATE_CACHE_VERSION,
168        log_offset,
169        state: state.clone(),
170    })?;
171    let tmp = path.with_extension("tmp");
172    fs::write(&tmp, payload)?;
173    fs::rename(tmp, path)?;
174    Ok(())
175}
176
177pub fn fold_state_from_append_log(cache_dir: &Path) -> Result<RawState> {
178    let log_path = cache_dir.join("things.log");
179    if !log_path.exists() {
180        return Ok(RawState::new());
181    }
182
183    let (mut state, byte_offset) = read_state_cache(cache_dir);
184    let mut new_lines = 0u64;
185
186    let mut file =
187        File::open(&log_path).with_context(|| format!("failed to open {}", log_path.display()))?;
188    file.seek(SeekFrom::Start(byte_offset))?;
189    let mut reader = BufReader::new(file);
190    let mut line = String::new();
191    let mut safe_offset = byte_offset;
192
193    loop {
194        line.clear();
195        let read = reader.read_line(&mut line)?;
196        if read == 0 {
197            break;
198        }
199
200        if !line.ends_with('\n') {
201            break;
202        }
203
204        let stripped = line.trim();
205        if stripped.is_empty() {
206            safe_offset = reader.stream_position()?;
207            continue;
208        }
209        let item: WireItem = serde_json::from_str(stripped)
210            .with_context(|| format!("Corrupt log entry at {}", log_path.display()))?;
211        fold_item(item, &mut state);
212        new_lines += 1;
213        safe_offset = reader.stream_position()?;
214    }
215
216    if new_lines > 0 {
217        write_state_cache(cache_dir, &state, safe_offset)?;
218    }
219
220    Ok(state)
221}
222
223pub fn get_state_with_append_log(
224    client: &mut ThingsCloudClient,
225    cache_dir: PathBuf,
226) -> Result<RawState> {
227    let mut sync_client = client.clone();
228    let sync_cache_dir = cache_dir.clone();
229
230    let sync_worker = std::thread::spawn(move || -> Result<SyncSnapshot> {
231        sync_append_log(&mut sync_client, &sync_cache_dir)?;
232        Ok(SyncSnapshot {
233            history_key: sync_client.history_key,
234            head_index: sync_client.head_index,
235        })
236    });
237
238    let _stale_state = fold_state_from_append_log(&cache_dir)?;
239
240    let sync_snapshot = sync_worker
241        .join()
242        .map_err(|_| anyhow!("sync worker panicked"))??;
243
244    client.history_key = sync_snapshot.history_key;
245    client.head_index = sync_snapshot.head_index;
246
247    fold_state_from_append_log(&cache_dir)
248}
249
250pub fn fold_state_from_append_log_or_empty(cache_dir: &Path) -> RawState {
251    fold_state_from_append_log(cache_dir).unwrap_or_default()
252}
253
254pub fn read_cached_head_index(cache_dir: &Path) -> i64 {
255    read_cursor(&cache_dir.join("cursor.json")).head_index
256}
257
258pub fn sync_append_log_or_err(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
259    sync_append_log(client, cache_dir).map_err(|e| anyhow!(e.to_string()))
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn fold_state_ignores_trailing_partial_line() {
268        let temp_dir = tempfile::tempdir().expect("tempdir");
269        let cache_dir = temp_dir.path();
270        let log_path = cache_dir.join("things.log");
271
272        let line_one = r#"{"3C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00A":{"t":0,"e":"Settings5","p":{}}}"#;
273        let line_two = r#"{"4C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00B":{"t":0,"e":"Settings5","p":{}}}"#;
274        let split_at = line_two.len() / 2;
275
276        fs::write(
277            &log_path,
278            format!("{}\n{}", line_one, &line_two[..split_at]),
279        )
280        .expect("seed log");
281
282        let first_state = fold_state_from_append_log(cache_dir).expect("first fold");
283        assert_eq!(first_state.len(), 1);
284
285        let (_, first_offset) = read_state_cache(cache_dir);
286        assert_eq!(first_offset, (line_one.len() + 1) as u64);
287
288        let mut fp = OpenOptions::new()
289            .append(true)
290            .open(&log_path)
291            .expect("open log for append");
292        writeln!(fp, "{}", &line_two[split_at..]).expect("append line remainder");
293
294        let second_state = fold_state_from_append_log(cache_dir).expect("second fold");
295        assert_eq!(second_state.len(), 2);
296
297        let expected_offset = fs::metadata(&log_path).expect("log metadata").len();
298        let (_, second_offset) = read_state_cache(cache_dir);
299        assert_eq!(second_offset, expected_offset);
300    }
301}