Skip to main content

client_core/sync/
reader.rs

1use super::map;
2use crate::crypto;
3use crate::http::{HttpError, RestClient};
4use crate::store::{queries, Store, StoreError};
5use std::time::{Duration, Instant};
6use tracing::warn;
7
8/// Wall-time and volume budget for a single backfill pass.
9pub struct BackfillBudget {
10    /// Maximum elapsed time before the pass is cut short.
11    pub max_wall: Duration,
12    /// Maximum number of clips to fetch from the relay in one pass.
13    pub max_clips: u32,
14}
15
16impl Default for BackfillBudget {
17    fn default() -> Self {
18        Self {
19            max_wall: Duration::from_secs(2),
20            max_clips: 500,
21        }
22    }
23}
24
25/// Convert a stored ULID watermark string to a `chrono::DateTime<chrono::Utc>`
26/// so it can be passed to [`RestClient::list_clips_since`].
27///
28/// Returns `None` if the string is empty or cannot be parsed as a ULID.
29fn ulid_to_datetime(ulid_str: &str) -> Option<chrono::DateTime<chrono::Utc>> {
30    use std::time::UNIX_EPOCH;
31    let ulid = ulid::Ulid::from_string(ulid_str).ok()?;
32    let ms = ulid.datetime().duration_since(UNIX_EPOCH).ok()?.as_millis() as i64;
33    chrono::DateTime::from_timestamp_millis(ms)
34}
35
36/// One pass of REST backfill from `meta.last_sync_watermark` forward.
37///
38/// Fetches clips newer than the stored watermark, decrypts any encrypted clips
39/// using `enc_key` (when provided), writes them into the local store, and
40/// advances the watermark to the highest clip ID seen.  Returns the number of
41/// clips inserted.
42///
43/// The HTTP layer does **not** decrypt — callers must supply the AES-256 key
44/// when the account uses client-side encryption.  If an encrypted clip is
45/// encountered and `enc_key` is `None`, or decryption fails, the clip is
46/// logged and skipped (same policy as design doc §9 risk item).
47pub async fn backfill_once(
48    store: &Store,
49    client: &RestClient,
50    budget: BackfillBudget,
51    enc_key: Option<&[u8; 32]>,
52) -> Result<usize, BackfillError> {
53    let start = Instant::now();
54
55    // Resolve the watermark: stored as a ULID → extract the embedded timestamp
56    // and pass it as the `since` parameter so the relay returns only newer clips.
57    let since = queries::watermark(store)
58        .map_err(BackfillError::Store)?
59        .and_then(|w| ulid_to_datetime(&w));
60
61    let clips = client
62        .list_clips_since(since, budget.max_clips)
63        .await
64        .map_err(BackfillError::Http)?;
65
66    let mut inserted = 0usize;
67    let mut max_id: Option<String> = None;
68
69    for mut clip in clips {
70        if start.elapsed() >= budget.max_wall {
71            break;
72        }
73
74        // Decrypt before mapping: the HTTP layer returns raw wire bytes and
75        // does not decrypt.  Skip clips we cannot decrypt rather than storing
76        // ciphertext, which would break FTS5 search and downstream rendering.
77        if clip.encrypted {
78            match enc_key {
79                None => {
80                    warn!(
81                        clip_id = %clip.clip_id,
82                        "backfill: skipping encrypted clip — no encryption key available"
83                    );
84                    continue;
85                }
86                Some(key) => {
87                    match crypto::decrypt(key, &clip.content) {
88                        Ok(plaintext) => {
89                            // Store decrypted content as UTF-8.  Non-UTF-8 bytes
90                            // (e.g. image data) are preserved via lossy conversion;
91                            // callers using the image branch recover bytes from
92                            // `StoredClip.content`.
93                            clip.content = String::from_utf8_lossy(&plaintext).into_owned();
94                            clip.encrypted = false;
95                        }
96                        Err(e) => {
97                            warn!(
98                                clip_id = %clip.clip_id,
99                                error  = %e,
100                                "backfill: skipping encrypted clip — decryption failed"
101                            );
102                            continue;
103                        }
104                    }
105                }
106            }
107        }
108
109        let stored = match map::clip_wire_to_stored(&clip).map_err(BackfillError::Map)? {
110            Some(c) => c,
111            None => continue,
112        };
113        // Track the lexicographically largest ULID to use as the new watermark.
114        // ULIDs sort lexicographically in time order, so the max string ID is
115        // the most recent clip.
116        if max_id
117            .as_deref()
118            .map(|m| stored.id.as_str() > m)
119            .unwrap_or(true)
120        {
121            max_id = Some(stored.id.clone());
122        }
123        queries::insert_clip(store, &stored).map_err(BackfillError::Store)?;
124        inserted += 1;
125    }
126
127    if let Some(id) = max_id {
128        queries::set_watermark(store, &id).map_err(BackfillError::Store)?;
129    }
130
131    Ok(inserted)
132}
133
134/// Errors that can occur during a backfill pass.
135#[derive(Debug, thiserror::Error)]
136pub enum BackfillError {
137    #[error("store: {0}")]
138    Store(#[from] StoreError),
139    #[error("http: {0}")]
140    Http(HttpError),
141    #[error("map: {0}")]
142    Map(String),
143}