client_core/sync/reader.rs
1use super::map;
2use crate::crypto;
3use crate::http::{HttpError, RestClient};
4use crate::store::{queries, Store, StoreError};
5use std::time::{Duration, Instant};
6use tracing::warn;
7
8/// Wall-time and volume budget for a single backfill pass.
9pub struct BackfillBudget {
10 /// Maximum elapsed time before the pass is cut short.
11 pub max_wall: Duration,
12 /// Maximum number of clips to fetch from the relay in one pass.
13 pub max_clips: u32,
14}
15
16impl Default for BackfillBudget {
17 fn default() -> Self {
18 Self {
19 max_wall: Duration::from_secs(2),
20 max_clips: 500,
21 }
22 }
23}
24
25/// Convert a stored ULID watermark string to a `chrono::DateTime<chrono::Utc>`
26/// so it can be passed to [`RestClient::list_clips_since`].
27///
28/// Returns `None` if the string is empty or cannot be parsed as a ULID.
29fn ulid_to_datetime(ulid_str: &str) -> Option<chrono::DateTime<chrono::Utc>> {
30 use std::time::UNIX_EPOCH;
31 let ulid = ulid::Ulid::from_string(ulid_str).ok()?;
32 let ms = ulid.datetime().duration_since(UNIX_EPOCH).ok()?.as_millis() as i64;
33 chrono::DateTime::from_timestamp_millis(ms)
34}
35
36/// One pass of REST backfill from `meta.last_sync_watermark` forward.
37///
38/// Fetches clips newer than the stored watermark, decrypts any encrypted clips
39/// using `enc_key` (when provided), writes them into the local store, and
40/// advances the watermark to the highest clip ID seen. Returns the number of
41/// clips inserted.
42///
43/// The HTTP layer does **not** decrypt — callers must supply the AES-256 key
44/// when the account uses client-side encryption. If an encrypted clip is
45/// encountered and `enc_key` is `None`, or decryption fails, the clip is
46/// logged and skipped (same policy as design doc §9 risk item).
47pub async fn backfill_once(
48 store: &Store,
49 client: &RestClient,
50 budget: BackfillBudget,
51 enc_key: Option<&[u8; 32]>,
52) -> Result<usize, BackfillError> {
53 let start = Instant::now();
54
55 // Resolve the watermark: stored as a ULID → extract the embedded timestamp
56 // and pass it as the `since` parameter so the relay returns only newer clips.
57 let since = queries::watermark(store)
58 .map_err(BackfillError::Store)?
59 .and_then(|w| ulid_to_datetime(&w));
60
61 let clips = client
62 .list_clips_since(since, budget.max_clips)
63 .await
64 .map_err(BackfillError::Http)?;
65
66 let mut inserted = 0usize;
67 let mut max_id: Option<String> = None;
68
69 for mut clip in clips {
70 if start.elapsed() >= budget.max_wall {
71 break;
72 }
73
74 // Decrypt before mapping: the HTTP layer returns raw wire bytes and
75 // does not decrypt. Skip clips we cannot decrypt rather than storing
76 // ciphertext, which would break FTS5 search and downstream rendering.
77 if clip.encrypted {
78 match enc_key {
79 None => {
80 warn!(
81 clip_id = %clip.clip_id,
82 "backfill: skipping encrypted clip — no encryption key available"
83 );
84 continue;
85 }
86 Some(key) => {
87 match crypto::decrypt(key, &clip.content) {
88 Ok(plaintext) => {
89 // Store decrypted content as UTF-8. Non-UTF-8 bytes
90 // (e.g. image data) are preserved via lossy conversion;
91 // callers using the image branch recover bytes from
92 // `StoredClip.content`.
93 clip.content = String::from_utf8_lossy(&plaintext).into_owned();
94 clip.encrypted = false;
95 }
96 Err(e) => {
97 warn!(
98 clip_id = %clip.clip_id,
99 error = %e,
100 "backfill: skipping encrypted clip — decryption failed"
101 );
102 continue;
103 }
104 }
105 }
106 }
107 }
108
109 let stored = match map::clip_wire_to_stored(&clip).map_err(BackfillError::Map)? {
110 Some(c) => c,
111 None => continue,
112 };
113 // Track the lexicographically largest ULID to use as the new watermark.
114 // ULIDs sort lexicographically in time order, so the max string ID is
115 // the most recent clip.
116 if max_id
117 .as_deref()
118 .map(|m| stored.id.as_str() > m)
119 .unwrap_or(true)
120 {
121 max_id = Some(stored.id.clone());
122 }
123 queries::insert_clip(store, &stored).map_err(BackfillError::Store)?;
124 inserted += 1;
125 }
126
127 if let Some(id) = max_id {
128 queries::set_watermark(store, &id).map_err(BackfillError::Store)?;
129 }
130
131 Ok(inserted)
132}
133
134/// Errors that can occur during a backfill pass.
135#[derive(Debug, thiserror::Error)]
136pub enum BackfillError {
137 #[error("store: {0}")]
138 Store(#[from] StoreError),
139 #[error("http: {0}")]
140 Http(HttpError),
141 #[error("map: {0}")]
142 Map(String),
143}