Skip to main content

cloudillo_file/
sync.rs

1//! File synchronization from remote instances
2//!
3//! Provides unified file sync functionality for both action attachments
4//! and profile picture synchronization.
5
6use std::collections::{HashMap, HashSet};
7
8use crate::prelude::*;
9use crate::variant::{Variant, VariantClass};
10use cloudillo_types::blob_adapter::CreateBlobOptions;
11use cloudillo_types::hasher;
12use cloudillo_types::meta_adapter::{CreateFile, FileId, FileVariant};
13use cloudillo_types::types::ApiResponse;
14
15/// Result of a file sync operation
16#[derive(Debug, Default)]
17pub struct SyncResult {
18	pub file_id: String,
19	pub synced_variants: Vec<String>,
20	pub skipped_variants: Vec<String>,
21	pub failed_variants: Vec<String>,
22}
23
24/// Variant size ordering for filtering by max_cache_variant setting
25const VARIANT_ORDER: &[&str] = &["tn", "pf", "sd", "md", "hd", "xd"];
26
27/// Check if a variant should be synced based on max variant setting
28fn should_sync_variant(variant: &str, max_variant: &str) -> bool {
29	let max_idx = VARIANT_ORDER.iter().position(|&v| v == max_variant).unwrap_or(usize::MAX);
30	let var_idx = VARIANT_ORDER.iter().position(|&v| v == variant).unwrap_or(usize::MAX);
31	var_idx <= max_idx
32}
33
34/// Get the sync setting key for a variant class (returns None for doc/raw - always sync)
35pub fn get_sync_setting_key(class: VariantClass) -> Option<&'static str> {
36	match class {
37		VariantClass::Visual => Some("file.sync_max_vis"),
38		VariantClass::Video => Some("file.sync_max_vid"),
39		VariantClass::Audio => Some("file.sync_max_aud"),
40		VariantClass::Document | VariantClass::Raw => None, // Always sync
41	}
42}
43
44/// Get the default sync max for a variant class
45pub fn get_default_sync_max(class: VariantClass) -> &'static str {
46	match class {
47		VariantClass::Visual => "md",
48		VariantClass::Video => "sd",
49		VariantClass::Audio => "md",
50		VariantClass::Document | VariantClass::Raw => "orig", // Always sync all
51	}
52}
53
54/// Determine sync source based on action context
55///
56/// If audienceTag is set and we're not the audience, sync from audience.
57/// Otherwise sync from issuer.
58pub fn get_sync_source<'a>(
59	tenant_tag: &str,
60	audience_tag: Option<&'a str>,
61	issuer_tag: &'a str,
62) -> &'a str {
63	match audience_tag {
64		Some(aud) if aud != tenant_tag => aud,
65		_ => issuer_tag,
66	}
67}
68
69/// Check if we are the audience (must sync ALL variants)
70pub fn is_audience(tenant_tag: &str, audience_tag: Option<&str>) -> bool {
71	audience_tag.map(|aud| aud == tenant_tag).unwrap_or(false)
72}
73
74/// Verify that content hash matches expected ID
75///
76/// IDs are formatted as `prefix~hash` (e.g., `b1~abc123`, `f1~xyz789`, `d1~...`)
77/// Returns Ok(()) if hash matches, Err if mismatch
78fn verify_content_hash(data: &[u8], expected_id: &str) -> ClResult<()> {
79	// Extract prefix from expected ID (e.g., "b" from "b1~hash", "f" from "f1~hash")
80	let prefix = expected_id
81		.find('1')
82		.map(|pos| &expected_id[..pos])
83		.ok_or_else(|| Error::ValidationError(format!("Invalid ID format: {}", expected_id)))?;
84
85	// Compute hash using the same hasher that generates IDs
86	let computed_id = hasher::hash(prefix, data);
87
88	if computed_id.as_ref() != expected_id {
89		return Err(Error::ValidationError(format!(
90			"Hash mismatch: expected {}, got {}",
91			expected_id, computed_id
92		)));
93	}
94
95	Ok(())
96}
97
98/// Sync file variants from a remote instance
99///
100/// # Arguments
101/// * `app` - Application state
102/// * `tn_id` - Tenant ID
103/// * `remote_id_tag` - Remote instance id_tag to fetch from
104/// * `file_id` - The file ID to sync
105/// * `variants` - Optional list of specific variants to sync (None = all up to max setting)
106/// * `auth` - Whether to use authenticated requests (true for direct-visibility files, false for public)
107///
108/// # Returns
109/// Ok(SyncResult) with details of what was synced
110pub async fn sync_file_variants(
111	app: &App,
112	tn_id: TnId,
113	remote_id_tag: &str,
114	file_id: &str,
115	variants: Option<&[&str]>,
116	auth: bool,
117) -> ClResult<SyncResult> {
118	let mut result = SyncResult { file_id: file_id.to_string(), ..Default::default() };
119
120	debug!("Syncing file {} from {}", file_id, remote_id_tag);
121
122	// 1. Fetch file descriptor from remote
123	let descriptor_path = format!("/files/{}/descriptor", file_id);
124	let descriptor_response: ApiResponse<String> = if auth {
125		app.request.get(tn_id, remote_id_tag, &descriptor_path).await?
126	} else {
127		app.request.get_noauth(tn_id, remote_id_tag, &descriptor_path).await?
128	};
129	let descriptor = &descriptor_response.data;
130
131	debug!("  fetched descriptor: {}", descriptor);
132
133	// 3. Parse descriptor to get variant info (parse first for debugging)
134	let mut parsed_variants = super::descriptor::parse_file_descriptor(descriptor)?;
135
136	// 2. Verify descriptor hash matches file_id
137	if let Err(e) = verify_content_hash(descriptor.as_bytes(), file_id) {
138		// Generate local descriptor from parsed variants to compare
139		parsed_variants.sort();
140		let local_descriptor = super::descriptor::get_file_descriptor(&parsed_variants);
141		warn!(
142			"Descriptor hash mismatch for {}:\n  fetched:   {}\n  generated: {}\n  error: {}",
143			file_id, descriptor, local_descriptor, e
144		);
145		return Err(e);
146	}
147
148	if parsed_variants.is_empty() {
149		warn!("No variants found in descriptor for {}", file_id);
150		return Ok(result);
151	}
152
153	// 4. Filter variants to sync using per-class settings
154	let variants_to_sync: Vec<_> = if let Some(explicit_variants) = variants {
155		// Explicit variant list provided - only sync those
156		parsed_variants
157			.iter()
158			.filter(|v| explicit_variants.contains(&v.variant))
159			.collect()
160	} else {
161		// Use per-class sync settings
162		let mut class_max_variants: HashMap<VariantClass, String> = HashMap::new();
163
164		// Build map of class -> max_variant from settings
165		for variant in &parsed_variants {
166			let class =
167				Variant::parse(variant.variant).map(|v| v.class).unwrap_or(VariantClass::Visual);
168
169			if let std::collections::hash_map::Entry::Vacant(e) = class_max_variants.entry(class) {
170				if let Some(setting_key) = get_sync_setting_key(class) {
171					let max_variant = app
172						.settings
173						.get_string_opt(tn_id, setting_key)
174						.await
175						.ok()
176						.flatten()
177						.unwrap_or_else(|| get_default_sync_max(class).to_string());
178					e.insert(max_variant);
179				}
180				// For doc/raw, no entry = sync all
181			}
182		}
183
184		// Filter variants using per-class max settings
185		parsed_variants
186			.iter()
187			.filter(|v| {
188				let class =
189					Variant::parse(v.variant).map(|vp| vp.class).unwrap_or(VariantClass::Visual);
190
191				// Doc/Raw always sync (no setting key)
192				let Some(max_variant) = class_max_variants.get(&class).map(|s| s.as_str()) else {
193					return true; // No limit = sync all
194				};
195
196				let quality =
197					Variant::parse(v.variant).map(|vp| vp.quality.as_str()).unwrap_or(v.variant);
198
199				should_sync_variant(quality, max_variant)
200			})
201			.collect()
202	};
203
204	// Build a set of variant names that should have their content synced
205	let variants_to_sync_set: HashSet<&str> = variants_to_sync.iter().map(|v| v.variant).collect();
206
207	debug!(
208		"Variants to sync content for: {:?}, total variants: {}",
209		variants_to_sync_set,
210		parsed_variants.len()
211	);
212
213	// 6. Check if file already exists by file_id and get its f_id
214	let existing_f_id = app.meta_adapter.read_f_id_by_file_id(tn_id, file_id).await.ok();
215
216	// Also get existing variant records to know which ones need to be created
217	let existing_variants: Vec<String> = if existing_f_id.is_some() {
218		app.meta_adapter
219			.list_file_variants(tn_id, cloudillo_types::meta_adapter::FileId::FileId(file_id))
220			.await
221			.map(|v| v.iter().map(|fv| fv.variant.to_string()).collect())
222			.unwrap_or_default()
223	} else {
224		vec![]
225	};
226
227	let (f_id, is_new_file): (Option<u64>, bool) = if let Some(f_id) = existing_f_id {
228		// File already exists - use its f_id to add missing variants
229		debug!("File {} already exists (f_id={}), syncing missing variants", file_id, f_id);
230		(Some(f_id), false)
231	} else {
232		// Create file entry with file_id and status='P' (pending)
233		// Variants can be added to pending files, then finalize sets status='A'
234		// Use first variant from parsed_variants (always available) for file creation
235		let first_variant = &parsed_variants[0];
236		let create_opts = CreateFile {
237			orig_variant_id: Some(first_variant.variant_id.into()),
238			file_id: Some(file_id.into()), // Set file_id (enables deduplication)
239			parent_id: None,
240			owner_tag: None, // Owned by tenant, not remote user
241			creator_tag: None,
242			preset: Some("sync".into()),
243			content_type: format_to_content_type(first_variant.format).into(),
244			file_name: format!("synced.{}", format_to_extension(first_variant.format)).into(),
245			file_tp: None,
246			created_at: Some(Timestamp::now()),
247			tags: None,
248			x: None,
249			visibility: Some('D'), // Direct visibility for synced files
250			status: None,          // Default to 'P' (pending)
251		};
252
253		match app.meta_adapter.create_file(tn_id, create_opts).await {
254			Ok(FileId::FId(f_id)) => (Some(f_id), true),
255			Ok(FileId::FileId(_)) => {
256				// Matched by orig_variant_id - shouldn't happen often but handle it
257				debug!("File {} matched existing by orig_variant_id", file_id);
258				(None, false)
259			}
260			Err(e) => {
261				warn!("Failed to create file entry for {}: {}", file_id, e);
262				return Err(e);
263			}
264		}
265	};
266
267	// 7. Process ALL variants from the descriptor
268	// Create metadata records for all variants, but only sync content for selected ones
269	for variant in &parsed_variants {
270		let variant_id = variant.variant_id;
271		let variant_name = variant.variant;
272		let should_sync_content = variants_to_sync_set.contains(variant_name);
273
274		// Check if this variant record already exists in the database
275		let variant_record_exists = existing_variants.iter().any(|v| v == variant_name);
276
277		if variant_record_exists {
278			// Variant record already exists - skip
279			debug!("  variant {} record already exists, skipping", variant_name);
280			result.skipped_variants.push(variant_name.to_string());
281			continue;
282		}
283
284		// Determine blob size and availability
285		let (blob_size, available) = if should_sync_content {
286			// This variant should have its content synced
287			let blob_exists = app.blob_adapter.stat_blob(tn_id, variant_id).await.is_some();
288
289			if blob_exists {
290				// Blob already exists - use its size
291				let size = app.blob_adapter.stat_blob(tn_id, variant_id).await.unwrap_or(0);
292				debug!("  variant {} blob already exists", variant_name);
293				result.skipped_variants.push(variant_name.to_string());
294				(size, true)
295			} else {
296				// Fetch variant data from remote
297				match fetch_and_store_blob(
298					app,
299					tn_id,
300					remote_id_tag,
301					variant_id,
302					variant_name,
303					auth,
304				)
305				.await
306				{
307					Ok(size) => {
308						info!("  synced variant {} ({})", variant_name, variant_id);
309						result.synced_variants.push(variant_name.to_string());
310						(size, true)
311					}
312					Err(e) => {
313						warn!("  failed to sync variant {}: {}", variant_name, e);
314						result.failed_variants.push(variant_name.to_string());
315						continue; // Skip creating record for failed fetches
316					}
317				}
318			}
319		} else {
320			// This variant is metadata-only (not syncing content)
321			debug!("  variant {} metadata-only (not syncing content)", variant_name);
322			result.skipped_variants.push(variant_name.to_string());
323			(variant.size, false) // Use size from descriptor, mark as unavailable
324		};
325
326		// Create file variant record in MetaAdapter
327		if let Some(f_id) = f_id {
328			let file_variant = FileVariant {
329				variant_id,
330				variant: variant_name,
331				format: variant.format,
332				resolution: variant.resolution,
333				size: blob_size,
334				available,
335				duration: None,
336				bitrate: None,
337				page_count: None,
338			};
339
340			if let Err(e) = app.meta_adapter.create_file_variant(tn_id, f_id, file_variant).await {
341				warn!("  failed to create variant record for {}: {}", variant_name, e);
342			}
343		}
344	}
345
346	// 8. Finalize the file by setting file_id (only if we created a new file entry)
347	if is_new_file {
348		if let Some(f_id) = f_id {
349			if let Err(e) = app.meta_adapter.finalize_file(tn_id, f_id, file_id).await {
350				warn!("Failed to finalize file {}: {}", file_id, e);
351				// Variants are synced, just finalization failed
352			}
353		}
354	}
355
356	info!(
357		"File sync complete for {}: {} synced, {} skipped, {} failed",
358		file_id,
359		result.synced_variants.len(),
360		result.skipped_variants.len(),
361		result.failed_variants.len()
362	);
363
364	Ok(result)
365}
366
367/// Fetch a variant blob from remote and store it locally
368///
369/// Returns the blob size on success
370async fn fetch_and_store_blob(
371	app: &App,
372	tn_id: TnId,
373	remote_id_tag: &str,
374	variant_id: &str,
375	variant_name: &str,
376	auth: bool,
377) -> ClResult<u64> {
378	let variant_path = format!("/files/variant/{}", variant_id);
379	let bytes = app.request.get_bin(tn_id, remote_id_tag, &variant_path, auth).await?;
380
381	if bytes.is_empty() {
382		warn!("  variant {} returned empty data", variant_name);
383		return Err(Error::ValidationError("empty variant data".into()));
384	}
385
386	// Verify blob hash matches variant_id
387	verify_content_hash(&bytes, variant_id).map_err(|e| {
388		error!("  variant {} hash mismatch: {}", variant_name, e);
389		e
390	})?;
391
392	let blob_size = bytes.len() as u64;
393
394	// Store blob
395	app.blob_adapter
396		.create_blob_buf(tn_id, variant_id, &bytes, &CreateBlobOptions::default())
397		.await
398		.map_err(|e| {
399			warn!("  failed to store blob for variant {}: {}", variant_name, e);
400			e
401		})?;
402
403	Ok(blob_size)
404}
405
406/// Convert format string to content type
407fn format_to_content_type(format: &str) -> &'static str {
408	match format.to_lowercase().as_str() {
409		"webp" => "image/webp",
410		"avif" => "image/avif",
411		"jpeg" | "jpg" => "image/jpeg",
412		"png" => "image/png",
413		"gif" => "image/gif",
414		_ => "application/octet-stream",
415	}
416}
417
418/// Convert format string to file extension
419fn format_to_extension(format: &str) -> &'static str {
420	match format.to_lowercase().as_str() {
421		"webp" => "webp",
422		"avif" => "avif",
423		"jpeg" | "jpg" => "jpg",
424		"png" => "png",
425		"gif" => "gif",
426		_ => "bin",
427	}
428}
429
430// vim: ts=4