1use crate::prelude::*;
4use cloudillo_core::request::ConditionalResult;
5use cloudillo_core::scheduler::Task;
6use cloudillo_types::meta_adapter::{
7 Profile, ProfileConnectionStatus, ProfileType, UpdateProfileData,
8};
9use cloudillo_types::types::ApiResponse;
10
11use async_trait::async_trait;
12use futures::stream::{self, StreamExt};
13use serde::{Deserialize, Serialize};
14use std::sync::Arc;
15
16const STALE_PROFILE_THRESHOLD_SECS: i64 = 86400;
18const BATCH_SIZE: u32 = 100;
20
21#[derive(Debug, serde::Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub struct RemoteProfile {
25 pub id_tag: String,
26 pub name: String,
27 #[serde(rename = "type")]
28 pub r#type: String,
29 pub profile_pic: Option<String>,
30 pub cover_pic: Option<String>,
31}
32
33pub async fn ensure_profile(app: &App, tn_id: TnId, id_tag: &str) -> ClResult<bool> {
42 if app.meta_adapter.read_profile(tn_id, id_tag).await.is_ok() {
44 tracing::debug!("Profile {} already exists locally", id_tag);
45 return Ok(false);
46 }
47
48 tracing::info!("Syncing profile {} from remote instance", id_tag);
50
51 let fetch_result: ClResult<ApiResponse<RemoteProfile>> =
52 app.request.get_noauth(tn_id, id_tag, "/me").await;
53
54 match fetch_result {
55 Ok(api_response) => {
56 let remote = api_response.data;
57
58 let typ = match remote.r#type.as_str() {
60 "community" => ProfileType::Community,
61 _ => ProfileType::Person,
62 };
63
64 let synced_profile_pic = if let Some(ref file_id) = remote.profile_pic {
67 match sync_profile_pic_variant(app, tn_id, id_tag, file_id).await {
68 Ok(()) => Some(file_id.as_str()),
69 Err(e) => {
70 tracing::warn!(
71 "Failed to sync profile picture for {}: {} (continuing without profile_pic)",
72 id_tag,
73 e
74 );
75 None
76 }
77 }
78 } else {
79 None
80 };
81
82 let profile = Profile {
84 id_tag: remote.id_tag.as_str(),
85 name: remote.name.as_str(),
86 typ,
87 profile_pic: synced_profile_pic,
88 following: false, connected: ProfileConnectionStatus::Disconnected,
90 roles: None,
91 };
92
93 let etag = format!("sync-{}", Timestamp::now().0);
95
96 app.meta_adapter.create_profile(tn_id, &profile, &etag).await?;
97
98 tracing::info!("Successfully synced profile {} from remote", id_tag);
99 Ok(true)
100 }
101 Err(e) => {
102 tracing::warn!("Failed to fetch profile {} from remote: {}", id_tag, e);
103 Err(e)
104 }
105 }
106}
107
108pub async fn refresh_profile(
118 app: &App,
119 tn_id: TnId,
120 id_tag: &str,
121 etag: Option<&str>,
122) -> ClResult<bool> {
123 tracing::debug!("Refreshing profile {} (etag: {:?})", id_tag, etag);
124
125 let result: ClResult<ConditionalResult<ApiResponse<RemoteProfile>>> =
127 app.request.get_conditional(id_tag, "/me", etag).await;
128
129 match result {
130 Ok(ConditionalResult::NotModified) => {
131 tracing::debug!("Profile {} not modified (304), updating synced_at", id_tag);
133
134 let update = UpdateProfileData { synced: Patch::Value(true), ..Default::default() };
135
136 app.meta_adapter.update_profile(tn_id, id_tag, &update).await?;
137 Ok(false)
138 }
139 Ok(ConditionalResult::Modified { data: api_response, etag: new_etag }) => {
140 let remote = api_response.data;
141 tracing::info!(
142 "Profile {} modified, updating (name={}, profile_pic={:?}, etag={:?})",
143 id_tag,
144 remote.name,
145 remote.profile_pic,
146 new_etag
147 );
148
149 let current_profile_pic = app
151 .meta_adapter
152 .read_profile(tn_id, id_tag)
153 .await
154 .ok()
155 .and_then(|(_, p)| p.profile_pic);
156
157 let profile_pic_changed =
160 remote.profile_pic.as_deref() != current_profile_pic.as_deref();
161 let profile_pic_synced = if profile_pic_changed {
162 if let Some(ref file_id) = remote.profile_pic {
163 if sync_profile_pic_variant(app, tn_id, id_tag, file_id).await.is_ok() {
164 true
165 } else {
166 tracing::debug!("Keeping old profile picture for {}", id_tag);
168 false
169 }
170 } else {
171 true
173 }
174 } else {
175 false
177 };
178
179 let mut update = UpdateProfileData {
181 name: Patch::Value(remote.name.into()),
182 synced: Patch::Value(true),
183 ..Default::default()
184 };
185
186 if profile_pic_synced {
188 update.profile_pic = Patch::Value(remote.profile_pic.clone().map(Into::into));
189 }
190
191 if profile_pic_synced || !profile_pic_changed {
194 if let Some(etag) = new_etag {
195 update.etag = Patch::Value(etag);
196 }
197 }
198
199 app.meta_adapter.update_profile(tn_id, id_tag, &update).await?;
200
201 Ok(true)
202 }
203 Err(e) => {
204 tracing::warn!("Failed to refresh profile {}: {}", id_tag, e);
205 let update = UpdateProfileData { synced: Patch::Value(true), ..Default::default() };
208 let _ = app.meta_adapter.update_profile(tn_id, id_tag, &update).await;
209 Ok(false)
210 }
211 }
212}
213
214async fn sync_profile_pic_variant(
222 app: &App,
223 tn_id: TnId,
224 id_tag: &str,
225 file_id: &str,
226) -> ClResult<()> {
227 use cloudillo_file::sync::sync_file_variants;
228
229 tracing::debug!(
230 "Syncing profile picture variant 'vis.pf' for {} (file_id: {})",
231 id_tag,
232 file_id
233 );
234
235 let result = sync_file_variants(app, tn_id, id_tag, file_id, Some(&["vis.pf"]), false).await?;
237
238 let vis_pf_synced = result.synced_variants.iter().any(|v| v == "vis.pf");
240 let vis_pf_skipped = result.skipped_variants.iter().any(|v| v == "vis.pf");
241 let vis_pf_failed = result.failed_variants.iter().any(|v| v == "vis.pf");
242
243 if vis_pf_synced {
244 tracing::info!(
245 "Synced profile picture variant 'vis.pf' for {} (file_id: {})",
246 id_tag,
247 file_id
248 );
249 Ok(())
250 } else if vis_pf_skipped {
251 tracing::debug!(
252 "Profile picture variant 'vis.pf' already exists for {} (file_id: {})",
253 id_tag,
254 file_id
255 );
256 Ok(())
257 } else if vis_pf_failed {
258 tracing::warn!(
259 "Failed to sync profile picture variant 'vis.pf' for {} (file_id: {})",
260 id_tag,
261 file_id
262 );
263 Err(Error::NetworkError("vis.pf variant sync failed".into()))
264 } else {
265 tracing::warn!(
267 "No 'vis.pf' variant found in descriptor for profile picture {} (file_id: {})",
268 id_tag,
269 file_id
270 );
271 Err(Error::NotFound)
272 }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct ProfileRefreshBatchTask;
281
282#[async_trait]
283impl Task<App> for ProfileRefreshBatchTask {
284 fn kind() -> &'static str {
285 "profile.refresh_batch"
286 }
287
288 fn kind_of(&self) -> &'static str {
289 Self::kind()
290 }
291
292 fn build(_id: u64, _context: &str) -> ClResult<Arc<dyn Task<App>>> {
293 Ok(Arc::new(ProfileRefreshBatchTask))
294 }
295
296 fn serialize(&self) -> String {
297 "{}".to_string()
298 }
299
300 async fn run(&self, app: &App) -> ClResult<()> {
301 info!("Starting profile refresh batch task");
302
303 let stale_profiles = app
305 .meta_adapter
306 .list_stale_profiles(STALE_PROFILE_THRESHOLD_SECS, BATCH_SIZE)
307 .await?;
308
309 let total = stale_profiles.len();
310 if total == 0 {
311 info!("No stale profiles to refresh");
312 return Ok(());
313 }
314
315 info!("Found {} stale profiles to refresh", total);
316
317 let results: Vec<_> = stream::iter(stale_profiles)
319 .map(|(tn_id, id_tag, etag)| {
320 let app = app.clone();
321 async move {
322 let result = refresh_profile(&app, tn_id, &id_tag, etag.as_deref()).await;
323 (id_tag, result)
324 }
325 })
326 .buffer_unordered(10)
327 .collect()
328 .await;
329
330 let mut refreshed = 0;
332 let mut not_modified = 0;
333 let mut errors = 0;
334
335 for (id_tag, result) in results {
336 match result {
337 Ok(true) => refreshed += 1,
338 Ok(false) => not_modified += 1,
339 Err(e) => {
340 warn!("Error refreshing profile {}: {}", id_tag, e);
341 errors += 1;
342 }
343 }
344 }
345
346 info!(
347 "Profile refresh batch complete: {} refreshed, {} not modified, {} errors",
348 refreshed, not_modified, errors
349 );
350
351 Ok(())
352 }
353}
354
355