Skip to main content

codetether_agent/tui/chat/sync/
worker.rs

1//! Main chat sync worker loop.
2
3use std::path::PathBuf;
4use std::time::Duration;
5
6use super::batch_upload::sync_batch;
7use super::config_types::ChatSyncConfig;
8use super::minio_client::build_minio_client;
9use super::s3_key::sanitize_s3_key_segment;
10use super::s3_key::{chat_sync_checkpoint_path, load_chat_sync_offset, store_chat_sync_offset};
11use super::types::ChatSyncUiEvent;
12
13/// Run the chat sync worker loop.
14pub async fn run_chat_sync_worker(
15    tx: tokio::sync::mpsc::Sender<ChatSyncUiEvent>,
16    archive_path: PathBuf,
17    config: ChatSyncConfig,
18) {
19    let cp = chat_sync_checkpoint_path(&archive_path, &config);
20    let mut offset = load_chat_sync_offset(&cp);
21    let host =
22        sanitize_s3_key_segment(&std::env::var("HOSTNAME").unwrap_or_else(|_| "localhost".into()));
23    let _ = tx
24        .send(ChatSyncUiEvent::Status(format!(
25            "Archive sync → {} / {} every {}s",
26            config.endpoint, config.bucket, config.interval_secs,
27        )))
28        .await;
29    let mut iv = tokio::time::interval(Duration::from_secs(config.interval_secs));
30    iv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
31    loop {
32        iv.tick().await;
33        let client = match build_minio_client(&config.endpoint, &config) {
34            Ok(c) => c,
35            Err(e) => {
36                let _ = tx
37                    .send(ChatSyncUiEvent::Error(format!("client init: {e}")))
38                    .await;
39                continue;
40            }
41        };
42        match sync_batch(&client, &archive_path, &config, &host, offset).await {
43            Ok(Some(b)) => {
44                offset = b.next_offset;
45                store_chat_sync_offset(&cp, offset);
46                let _ = tx
47                    .send(ChatSyncUiEvent::BatchUploaded {
48                        bytes: b.bytes,
49                        records: b.records,
50                        object_key: b.object_key,
51                    })
52                    .await;
53            }
54            Ok(None) => {}
55            Err(e) => {
56                let _ = tx.send(ChatSyncUiEvent::Error(format!("sync: {e}"))).await;
57            }
58        }
59    }
60}