Skip to main content

cloudillo_profile/
sync.rs

1//! Profile synchronization from remote instances
2
3use crate::prelude::*;
4use cloudillo_core::request::ConditionalResult;
5use cloudillo_core::scheduler::Task;
6use cloudillo_types::meta_adapter::{
7	Profile, ProfileConnectionStatus, ProfileType, UpdateProfileData,
8};
9use cloudillo_types::types::ApiResponse;
10
11use async_trait::async_trait;
12use futures::stream::{self, StreamExt};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15
16/// Stale profile threshold in seconds (24 hours)
17const STALE_PROFILE_THRESHOLD_SECS: i64 = 86400;
18/// Maximum profiles to process per batch
19const BATCH_SIZE: u32 = 100;
20
21/// Remote profile response from /me endpoint
22#[derive(Debug, serde::Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub struct RemoteProfile {
25	pub id_tag: String,
26	pub name: String,
27	#[serde(rename = "type")]
28	pub r#type: String,
29	pub profile_pic: Option<String>,
30	pub cover_pic: Option<String>,
31}
32
33/// Ensure a profile exists locally by fetching from remote if needed.
34///
35/// This function:
36/// 1. Checks if the profile already exists locally
37/// 2. If not, fetches the profile from the remote instance
38/// 3. Creates the profile locally with the fetched data
39///
40/// Returns Ok(true) if the profile was synced (created), Ok(false) if it already existed.
41pub async fn ensure_profile(app: &App, tn_id: TnId, id_tag: &str) -> ClResult<bool> {
42	// Check if profile already exists
43	if app.meta_adapter.read_profile(tn_id, id_tag).await.is_ok() {
44		tracing::debug!("Profile {} already exists locally", id_tag);
45		return Ok(false);
46	}
47
48	// Fetch profile from remote instance
49	tracing::info!("Syncing profile {} from remote instance", id_tag);
50
51	let fetch_result: ClResult<ApiResponse<RemoteProfile>> =
52		app.request.get_noauth(tn_id, id_tag, "/me").await;
53
54	match fetch_result {
55		Ok(api_response) => {
56			let remote = api_response.data;
57
58			// Determine profile type
59			let typ = match remote.r#type.as_str() {
60				"community" => ProfileType::Community,
61				_ => ProfileType::Person,
62			};
63
64			// Sync profile picture FIRST if present, before creating profile
65			// Only include profile_pic in the profile if sync succeeds
66			let synced_profile_pic = if let Some(ref file_id) = remote.profile_pic {
67				match sync_profile_pic_variant(app, tn_id, id_tag, file_id).await {
68					Ok(_) => Some(file_id.as_str()),
69					Err(e) => {
70						tracing::warn!(
71							"Failed to sync profile picture for {}: {} (continuing without profile_pic)",
72							id_tag,
73							e
74						);
75						None
76					}
77				}
78			} else {
79				None
80			};
81
82			// Create local profile record (with profile_pic only if sync succeeded)
83			let profile = Profile {
84				id_tag: remote.id_tag.as_str(),
85				name: remote.name.as_str(),
86				typ,
87				profile_pic: synced_profile_pic,
88				following: false, // Will be set by the calling hook
89				connected: ProfileConnectionStatus::Disconnected,
90				roles: None,
91			};
92
93			// Generate a simple etag
94			let etag = format!("sync-{}", Timestamp::now().0);
95
96			app.meta_adapter.create_profile(tn_id, &profile, &etag).await?;
97
98			tracing::info!("Successfully synced profile {} from remote", id_tag);
99			Ok(true)
100		}
101		Err(e) => {
102			tracing::warn!("Failed to fetch profile {} from remote: {}", id_tag, e);
103			Err(e)
104		}
105	}
106}
107
108/// Refresh an existing profile from remote instance using conditional request.
109///
110/// This function:
111/// 1. Sends a conditional GET request with If-None-Match header using stored etag
112/// 2. If 304 Not Modified: only updates synced_at timestamp
113/// 3. If 200 OK: updates profile data (name, profile_pic) and synced_at
114/// 4. Syncs profile picture if changed
115///
116/// Returns Ok(true) if profile data was updated, Ok(false) if not modified or on error.
117pub async fn refresh_profile(
118	app: &App,
119	tn_id: TnId,
120	id_tag: &str,
121	etag: Option<&str>,
122) -> ClResult<bool> {
123	tracing::debug!("Refreshing profile {} (etag: {:?})", id_tag, etag);
124
125	// Make conditional request to remote /me endpoint
126	let result: ClResult<ConditionalResult<ApiResponse<RemoteProfile>>> =
127		app.request.get_conditional(id_tag, "/me", etag).await;
128
129	match result {
130		Ok(ConditionalResult::NotModified) => {
131			// Profile hasn't changed, just update synced_at
132			tracing::debug!("Profile {} not modified (304), updating synced_at", id_tag);
133
134			let update = UpdateProfileData { synced: Patch::Value(true), ..Default::default() };
135
136			app.meta_adapter.update_profile(tn_id, id_tag, &update).await?;
137			Ok(false)
138		}
139		Ok(ConditionalResult::Modified { data: api_response, etag: new_etag }) => {
140			let remote = api_response.data;
141			tracing::info!(
142				"Profile {} modified, updating (name={}, profile_pic={:?}, etag={:?})",
143				id_tag,
144				remote.name,
145				remote.profile_pic,
146				new_etag
147			);
148
149			// Read current profile to check if profile_pic changed
150			let current_profile_pic = app
151				.meta_adapter
152				.read_profile(tn_id, id_tag)
153				.await
154				.ok()
155				.and_then(|(_, p)| p.profile_pic);
156
157			// Sync profile picture FIRST if it changed
158			// Only update profile_pic in database if sync succeeds
159			let profile_pic_changed =
160				remote.profile_pic.as_deref() != current_profile_pic.as_deref();
161			let profile_pic_synced = if profile_pic_changed {
162				if let Some(ref file_id) = remote.profile_pic {
163					match sync_profile_pic_variant(app, tn_id, id_tag, file_id).await {
164						Ok(_) => true,
165						Err(_) => {
166							// Error already logged in sync_file_variants
167							tracing::debug!("Keeping old profile picture for {}", id_tag);
168							false
169						}
170					}
171				} else {
172					// Remote has no profile pic - that's a valid sync
173					true
174				}
175			} else {
176				// No change needed
177				false
178			};
179
180			// Build update - only include profile_pic if sync succeeded
181			let mut update = UpdateProfileData {
182				name: Patch::Value(remote.name.into()),
183				synced: Patch::Value(true),
184				..Default::default()
185			};
186
187			// Only update profile_pic if we successfully synced it (or it was removed)
188			if profile_pic_synced {
189				update.profile_pic = Patch::Value(remote.profile_pic.clone().map(|s| s.into()));
190			}
191
192			// Only update etag if profile_pic sync succeeded (or wasn't needed)
193			// This ensures we'll retry on next sync if the picture failed
194			if profile_pic_synced || !profile_pic_changed {
195				if let Some(etag) = new_etag {
196					update.etag = Patch::Value(etag);
197				}
198			}
199
200			app.meta_adapter.update_profile(tn_id, id_tag, &update).await?;
201
202			Ok(true)
203		}
204		Err(e) => {
205			tracing::warn!("Failed to refresh profile {}: {}", id_tag, e);
206			// Don't propagate error - just log and return false
207			// Still update synced_at to avoid repeated retries
208			let update = UpdateProfileData { synced: Patch::Value(true), ..Default::default() };
209			let _ = app.meta_adapter.update_profile(tn_id, id_tag, &update).await;
210			Ok(false)
211		}
212	}
213}
214
215/// Sync the 'pf' (profile) variant of a profile picture from a remote instance.
216///
217/// Uses the unified file sync helper to fetch the file descriptor,
218/// verify hashes, and sync only the 'pf' variant.
219///
220/// Returns Ok(()) if vis.pf was successfully synced or already exists locally.
221/// Returns Err if vis.pf variant doesn't exist in the remote descriptor or sync failed.
222async fn sync_profile_pic_variant(
223	app: &App,
224	tn_id: TnId,
225	id_tag: &str,
226	file_id: &str,
227) -> ClResult<()> {
228	use cloudillo_file::sync::sync_file_variants;
229
230	tracing::debug!(
231		"Syncing profile picture variant 'vis.pf' for {} (file_id: {})",
232		id_tag,
233		file_id
234	);
235
236	// Sync only the 'vis.pf' variant for profile pictures (public, no auth needed)
237	let result = sync_file_variants(app, tn_id, id_tag, file_id, Some(&["vis.pf"]), false).await?;
238
239	// Check if vis.pf was specifically synced or skipped (already exists)
240	let vis_pf_synced = result.synced_variants.iter().any(|v| v == "vis.pf");
241	let vis_pf_skipped = result.skipped_variants.iter().any(|v| v == "vis.pf");
242	let vis_pf_failed = result.failed_variants.iter().any(|v| v == "vis.pf");
243
244	if vis_pf_synced {
245		tracing::info!(
246			"Synced profile picture variant 'vis.pf' for {} (file_id: {})",
247			id_tag,
248			file_id
249		);
250		Ok(())
251	} else if vis_pf_skipped {
252		tracing::debug!(
253			"Profile picture variant 'vis.pf' already exists for {} (file_id: {})",
254			id_tag,
255			file_id
256		);
257		Ok(())
258	} else if vis_pf_failed {
259		tracing::warn!(
260			"Failed to sync profile picture variant 'vis.pf' for {} (file_id: {})",
261			id_tag,
262			file_id
263		);
264		Err(Error::NetworkError("vis.pf variant sync failed".into()))
265	} else {
266		// vis.pf wasn't in synced, skipped, or failed - means it doesn't exist in the descriptor
267		tracing::warn!(
268			"No 'vis.pf' variant found in descriptor for profile picture {} (file_id: {})",
269			id_tag,
270			file_id
271		);
272		Err(Error::NotFound)
273	}
274}
275
276/// Batch task for refreshing stale profiles
277///
278/// This task queries profiles that haven't been synced in 24 hours
279/// and refreshes them from their remote instances.
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct ProfileRefreshBatchTask;
282
283#[async_trait]
284impl Task<App> for ProfileRefreshBatchTask {
285	fn kind() -> &'static str {
286		"profile.refresh_batch"
287	}
288
289	fn kind_of(&self) -> &'static str {
290		Self::kind()
291	}
292
293	fn build(_id: u64, _context: &str) -> ClResult<Arc<dyn Task<App>>> {
294		Ok(Arc::new(ProfileRefreshBatchTask))
295	}
296
297	fn serialize(&self) -> String {
298		"{}".to_string()
299	}
300
301	async fn run(&self, app: &App) -> ClResult<()> {
302		info!("Starting profile refresh batch task");
303
304		// Query stale profiles
305		let stale_profiles = app
306			.meta_adapter
307			.list_stale_profiles(STALE_PROFILE_THRESHOLD_SECS, BATCH_SIZE)
308			.await?;
309
310		let total = stale_profiles.len();
311		if total == 0 {
312			info!("No stale profiles to refresh");
313			return Ok(());
314		}
315
316		info!("Found {} stale profiles to refresh", total);
317
318		// Process profiles concurrently (up to 10 at a time)
319		let results: Vec<_> = stream::iter(stale_profiles)
320			.map(|(tn_id, id_tag, etag)| {
321				let app = app.clone();
322				async move {
323					let result = refresh_profile(&app, tn_id, &id_tag, etag.as_deref()).await;
324					(id_tag, result)
325				}
326			})
327			.buffer_unordered(10)
328			.collect()
329			.await;
330
331		// Count results
332		let mut refreshed = 0;
333		let mut not_modified = 0;
334		let mut errors = 0;
335
336		for (id_tag, result) in results {
337			match result {
338				Ok(true) => refreshed += 1,
339				Ok(false) => not_modified += 1,
340				Err(e) => {
341					warn!("Error refreshing profile {}: {}", id_tag, e);
342					errors += 1;
343				}
344			}
345		}
346
347		info!(
348			"Profile refresh batch complete: {} refreshed, {} not modified, {} errors",
349			refreshed, not_modified, errors
350		);
351
352		Ok(())
353	}
354}
355
356// vim: ts=4