1use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::peer::Peer;
13
14use super::{DownloadPinsJob, SyncJob};
15
16#[derive(Debug, Clone)]
18pub struct SyncTarget {
19 pub link: Link,
21 pub height: u64,
23 pub peer_id: PublicKey,
25}
26
27#[derive(Debug, Clone)]
29pub struct SyncBucketJob {
30 pub bucket_id: Uuid,
31 pub target: SyncTarget,
32}
33
34pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
40where
41 L: BucketLogProvider + Clone + Send + Sync + 'static,
42 L::Error: std::error::Error + Send + Sync + 'static,
43{
44 tracing::info!(
45 "Syncing bucket {} from peer {} to link {:?} at height {}",
46 job.bucket_id,
47 job.target.peer_id.to_hex(),
48 job.target.link,
49 job.target.height
50 );
51
52 let exists: bool = peer.logs().exists(job.bucket_id).await?;
53
54 let common_ancestor = if exists {
55 find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_id).await?
58 } else {
59 None
60 };
61
62 if exists && common_ancestor.is_none() {
69 tracing::warn!(
70 "Bucket {} diverged from peer {:?}",
71 job.bucket_id,
72 job.target.peer_id
73 );
74 return Ok(());
75 }
76
77 let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
79 let stop_link = stop_link_owned.as_ref();
80
81 if stop_link.is_none() && common_ancestor.is_none() {
82 tracing::info!(
84 "No common ancestor for bucket {}, syncing from genesis",
85 job.bucket_id
86 );
87 }
88
89 let manifests =
94 download_manifest_chain(peer, &job.target.link, stop_link, &job.target.peer_id).await?;
95
96 if manifests.is_empty() {
100 tracing::info!("No new manifests to sync, already up to date");
101 return Ok(());
102 };
103
104 if !verify_provenance(peer, &manifests.last().unwrap().0)? {
108 tracing::warn!("Provenance verification failed: our key not in bucket shares");
109 return Ok(());
110 }
111
112 apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
114
115 Ok(())
116}
117
118async fn download_manifest_chain<L>(
125 peer: &Peer<L>,
126 start_link: &Link,
127 stop_link: Option<&Link>,
128 peer_id: &PublicKey,
130) -> Result<Vec<(Manifest, Link)>>
131where
132 L: BucketLogProvider + Clone + Send + Sync + 'static,
133 L::Error: std::error::Error + Send + Sync + 'static,
134{
135 tracing::debug!(
136 "Downloading manifest chain from {:?}, stop_at: {:?}",
137 start_link,
138 stop_link
139 );
140
141 let mut manifests = Vec::new();
142 let mut current_link = start_link.clone();
143
144 loop {
146 peer.blobs()
148 .download_hash(current_link.hash(), vec![*peer_id], peer.endpoint())
149 .await
150 .map_err(|e| {
151 anyhow!(
152 "Failed to download manifest {:?} from peer: {}",
153 current_link,
154 e
155 )
156 })?;
157
158 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
160
161 if let Some(stop_link) = stop_link {
163 if ¤t_link == stop_link {
164 tracing::debug!("Reached stop_at link, stopping download");
165 break;
166 }
167 }
168
169 manifests.push((manifest.clone(), current_link.clone()));
170
171 match manifest.previous() {
173 Some(prev_link) => {
174 current_link = prev_link.clone();
175 }
176 None => {
177 tracing::debug!("Reached genesis manifest, stopping download");
179 break;
180 }
181 }
182 }
183
184 manifests.reverse();
186
187 tracing::debug!("Downloaded {} manifests", manifests.len());
188 Ok(manifests)
189}
190
191async fn find_common_ancestor<L>(
210 peer: &Peer<L>,
211 bucket_id: Uuid,
212 link: &Link,
213 peer_id: &PublicKey,
214) -> Result<Option<(Link, u64)>>
215where
216 L: BucketLogProvider + Clone + Send + Sync + 'static,
217 L::Error: std::error::Error + Send + Sync + 'static,
218{
219 tracing::debug!(
220 "Finding common ancestor starting from {:?} with peer {}",
221 link,
222 peer_id.to_hex()
223 );
224
225 let mut current_link = link.clone();
226 let mut manifests_checked = 0;
227
228 loop {
229 manifests_checked += 1;
230 tracing::debug!(
231 "Checking manifest {} at link {:?}",
232 manifests_checked,
233 current_link
234 );
235
236 peer.blobs()
240 .download_hash(current_link.hash(), vec![*peer_id], peer.endpoint())
241 .await
242 .map_err(|e| {
243 anyhow!(
244 "Failed to download manifest {:?} from peer: {}",
245 current_link,
246 e
247 )
248 })?;
249
250 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
252 let height = manifest.height();
253
254 match peer.logs().has(bucket_id, current_link.clone()).await {
256 Ok(heights) if !heights.is_empty() => {
257 tracing::info!(
258 "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
259 current_link,
260 height,
261 heights
262 );
263 return Ok(Some((current_link, height)));
264 }
265 Ok(_) => {
266 tracing::debug!("Link {:?} not in our log, checking previous", current_link);
268 }
269 Err(e) => {
270 tracing::warn!("Error checking for link in log: {}", e);
271 }
273 }
274
275 match manifest.previous() {
277 Some(prev_link) => {
278 current_link = prev_link.clone();
279 }
280 None => {
281 tracing::debug!(
283 "Reached genesis after checking {} manifests, no common ancestor found",
284 manifests_checked
285 );
286 return Ok(None);
287 }
288 }
289 }
290}
291
292async fn apply_manifest_chain<L>(
296 peer: &Peer<L>,
297 bucket_id: Uuid,
298 manifests: &[(Manifest, Link)],
299) -> Result<()>
300where
301 L: BucketLogProvider + Clone + Send + Sync + 'static,
302 L::Error: std::error::Error + Send + Sync + 'static,
303{
304 tracing::info!("Applying {} manifests to log", manifests.len(),);
305
306 if let Some((_i, (manifest, link))) = manifests.iter().enumerate().next() {
307 let previous = manifest.previous().clone();
308 let height = manifest.height();
309
310 tracing::info!(
311 "Appending manifest to log: height={}, link={:?}, previous={:?}",
312 height,
313 link,
314 previous
315 );
316
317 peer.logs()
318 .append(
319 bucket_id,
320 manifest.name().to_string(),
321 link.clone(),
322 previous,
323 height,
324 )
325 .await
326 .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
327
328 let pins_link = manifest.pins().clone();
329 let peer_ids = manifest
330 .shares()
331 .iter()
332 .map(|share| share.1.principal().identity)
333 .collect();
334 return peer
335 .dispatch(SyncJob::DownloadPins(DownloadPinsJob {
336 pins_link,
337 peer_ids,
338 }))
339 .await;
340 }
341
342 tracing::info!("Successfully applied {} manifests to log", manifests.len());
343 Ok(())
344}
345
346fn verify_provenance<L>(peer: &Peer<L>, manifest: &Manifest) -> Result<bool>
348where
349 L: BucketLogProvider + Clone + Send + Sync + 'static,
350 L::Error: std::error::Error + Send + Sync + 'static,
351{
352 let our_pub_key = PublicKey::from(*peer.secret().public());
353 let our_pub_key_hex = our_pub_key.to_hex();
354
355 let is_authorized = manifest
357 .shares()
358 .iter()
359 .any(|(key_hex, _share)| key_hex == &our_pub_key_hex);
360
361 tracing::debug!(
362 "Provenance check: our_key={}, authorized={}",
363 our_pub_key_hex,
364 is_authorized
365 );
366
367 Ok(is_authorized)
368}