1use crate::prelude::*;
4use cloudillo_core::request::ConditionalResult;
5use cloudillo_core::scheduler::Task;
6use cloudillo_types::meta_adapter::{
7 Profile, ProfileConnectionStatus, ProfileType, UpdateProfileData,
8};
9use cloudillo_types::types::ApiResponse;
10
11use async_trait::async_trait;
12use futures::stream::{self, StreamExt};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15
16const STALE_PROFILE_THRESHOLD_SECS: i64 = 86400;
18const BATCH_SIZE: u32 = 100;
20
21#[derive(Debug, serde::Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub struct RemoteProfile {
25 pub id_tag: String,
26 pub name: String,
27 #[serde(rename = "type")]
28 pub r#type: String,
29 pub profile_pic: Option<String>,
30 pub cover_pic: Option<String>,
31}
32
33pub async fn ensure_profile(app: &App, tn_id: TnId, id_tag: &str) -> ClResult<bool> {
42 if app.meta_adapter.read_profile(tn_id, id_tag).await.is_ok() {
44 tracing::debug!("Profile {} already exists locally", id_tag);
45 return Ok(false);
46 }
47
48 tracing::info!("Syncing profile {} from remote instance", id_tag);
50
51 let fetch_result: ClResult<ApiResponse<RemoteProfile>> =
52 app.request.get_noauth(tn_id, id_tag, "/me").await;
53
54 match fetch_result {
55 Ok(api_response) => {
56 let remote = api_response.data;
57
58 let typ = match remote.r#type.as_str() {
60 "community" => ProfileType::Community,
61 _ => ProfileType::Person,
62 };
63
64 let synced_profile_pic = if let Some(ref file_id) = remote.profile_pic {
67 match sync_profile_pic_variant(app, tn_id, id_tag, file_id).await {
68 Ok(_) => Some(file_id.as_str()),
69 Err(e) => {
70 tracing::warn!(
71 "Failed to sync profile picture for {}: {} (continuing without profile_pic)",
72 id_tag,
73 e
74 );
75 None
76 }
77 }
78 } else {
79 None
80 };
81
82 let profile = Profile {
84 id_tag: remote.id_tag.as_str(),
85 name: remote.name.as_str(),
86 typ,
87 profile_pic: synced_profile_pic,
88 following: false, connected: ProfileConnectionStatus::Disconnected,
90 roles: None,
91 };
92
93 let etag = format!("sync-{}", Timestamp::now().0);
95
96 app.meta_adapter.create_profile(tn_id, &profile, &etag).await?;
97
98 tracing::info!("Successfully synced profile {} from remote", id_tag);
99 Ok(true)
100 }
101 Err(e) => {
102 tracing::warn!("Failed to fetch profile {} from remote: {}", id_tag, e);
103 Err(e)
104 }
105 }
106}
107
108pub async fn refresh_profile(
118 app: &App,
119 tn_id: TnId,
120 id_tag: &str,
121 etag: Option<&str>,
122) -> ClResult<bool> {
123 tracing::debug!("Refreshing profile {} (etag: {:?})", id_tag, etag);
124
125 let result: ClResult<ConditionalResult<ApiResponse<RemoteProfile>>> =
127 app.request.get_conditional(id_tag, "/me", etag).await;
128
129 match result {
130 Ok(ConditionalResult::NotModified) => {
131 tracing::debug!("Profile {} not modified (304), updating synced_at", id_tag);
133
134 let update = UpdateProfileData { synced: Patch::Value(true), ..Default::default() };
135
136 app.meta_adapter.update_profile(tn_id, id_tag, &update).await?;
137 Ok(false)
138 }
139 Ok(ConditionalResult::Modified { data: api_response, etag: new_etag }) => {
140 let remote = api_response.data;
141 tracing::info!(
142 "Profile {} modified, updating (name={}, profile_pic={:?}, etag={:?})",
143 id_tag,
144 remote.name,
145 remote.profile_pic,
146 new_etag
147 );
148
149 let current_profile_pic = app
151 .meta_adapter
152 .read_profile(tn_id, id_tag)
153 .await
154 .ok()
155 .and_then(|(_, p)| p.profile_pic);
156
157 let profile_pic_changed =
160 remote.profile_pic.as_deref() != current_profile_pic.as_deref();
161 let profile_pic_synced = if profile_pic_changed {
162 if let Some(ref file_id) = remote.profile_pic {
163 match sync_profile_pic_variant(app, tn_id, id_tag, file_id).await {
164 Ok(_) => true,
165 Err(_) => {
166 tracing::debug!("Keeping old profile picture for {}", id_tag);
168 false
169 }
170 }
171 } else {
172 true
174 }
175 } else {
176 false
178 };
179
180 let mut update = UpdateProfileData {
182 name: Patch::Value(remote.name.into()),
183 synced: Patch::Value(true),
184 ..Default::default()
185 };
186
187 if profile_pic_synced {
189 update.profile_pic = Patch::Value(remote.profile_pic.clone().map(|s| s.into()));
190 }
191
192 if profile_pic_synced || !profile_pic_changed {
195 if let Some(etag) = new_etag {
196 update.etag = Patch::Value(etag);
197 }
198 }
199
200 app.meta_adapter.update_profile(tn_id, id_tag, &update).await?;
201
202 Ok(true)
203 }
204 Err(e) => {
205 tracing::warn!("Failed to refresh profile {}: {}", id_tag, e);
206 let update = UpdateProfileData { synced: Patch::Value(true), ..Default::default() };
209 let _ = app.meta_adapter.update_profile(tn_id, id_tag, &update).await;
210 Ok(false)
211 }
212 }
213}
214
215async fn sync_profile_pic_variant(
223 app: &App,
224 tn_id: TnId,
225 id_tag: &str,
226 file_id: &str,
227) -> ClResult<()> {
228 use cloudillo_file::sync::sync_file_variants;
229
230 tracing::debug!(
231 "Syncing profile picture variant 'vis.pf' for {} (file_id: {})",
232 id_tag,
233 file_id
234 );
235
236 let result = sync_file_variants(app, tn_id, id_tag, file_id, Some(&["vis.pf"]), false).await?;
238
239 let vis_pf_synced = result.synced_variants.iter().any(|v| v == "vis.pf");
241 let vis_pf_skipped = result.skipped_variants.iter().any(|v| v == "vis.pf");
242 let vis_pf_failed = result.failed_variants.iter().any(|v| v == "vis.pf");
243
244 if vis_pf_synced {
245 tracing::info!(
246 "Synced profile picture variant 'vis.pf' for {} (file_id: {})",
247 id_tag,
248 file_id
249 );
250 Ok(())
251 } else if vis_pf_skipped {
252 tracing::debug!(
253 "Profile picture variant 'vis.pf' already exists for {} (file_id: {})",
254 id_tag,
255 file_id
256 );
257 Ok(())
258 } else if vis_pf_failed {
259 tracing::warn!(
260 "Failed to sync profile picture variant 'vis.pf' for {} (file_id: {})",
261 id_tag,
262 file_id
263 );
264 Err(Error::NetworkError("vis.pf variant sync failed".into()))
265 } else {
266 tracing::warn!(
268 "No 'vis.pf' variant found in descriptor for profile picture {} (file_id: {})",
269 id_tag,
270 file_id
271 );
272 Err(Error::NotFound)
273 }
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct ProfileRefreshBatchTask;
282
283#[async_trait]
284impl Task<App> for ProfileRefreshBatchTask {
285 fn kind() -> &'static str {
286 "profile.refresh_batch"
287 }
288
289 fn kind_of(&self) -> &'static str {
290 Self::kind()
291 }
292
293 fn build(_id: u64, _context: &str) -> ClResult<Arc<dyn Task<App>>> {
294 Ok(Arc::new(ProfileRefreshBatchTask))
295 }
296
297 fn serialize(&self) -> String {
298 "{}".to_string()
299 }
300
301 async fn run(&self, app: &App) -> ClResult<()> {
302 info!("Starting profile refresh batch task");
303
304 let stale_profiles = app
306 .meta_adapter
307 .list_stale_profiles(STALE_PROFILE_THRESHOLD_SECS, BATCH_SIZE)
308 .await?;
309
310 let total = stale_profiles.len();
311 if total == 0 {
312 info!("No stale profiles to refresh");
313 return Ok(());
314 }
315
316 info!("Found {} stale profiles to refresh", total);
317
318 let results: Vec<_> = stream::iter(stale_profiles)
320 .map(|(tn_id, id_tag, etag)| {
321 let app = app.clone();
322 async move {
323 let result = refresh_profile(&app, tn_id, &id_tag, etag.as_deref()).await;
324 (id_tag, result)
325 }
326 })
327 .buffer_unordered(10)
328 .collect()
329 .await;
330
331 let mut refreshed = 0;
333 let mut not_modified = 0;
334 let mut errors = 0;
335
336 for (id_tag, result) in results {
337 match result {
338 Ok(true) => refreshed += 1,
339 Ok(false) => not_modified += 1,
340 Err(e) => {
341 warn!("Error refreshing profile {}: {}", id_tag, e);
342 errors += 1;
343 }
344 }
345 }
346
347 info!(
348 "Profile refresh batch complete: {} refreshed, {} not modified, {} errors",
349 refreshed, not_modified, errors
350 );
351
352 Ok(())
353 }
354}
355
356