codetether_agent/tui/chat/sync/
worker.rs1use std::path::PathBuf;
4use std::time::Duration;
5
6use super::batch_upload::sync_batch;
7use super::config_types::ChatSyncConfig;
8use super::minio_client::build_minio_client;
9use super::s3_key::sanitize_s3_key_segment;
10use super::s3_key::{chat_sync_checkpoint_path, load_chat_sync_offset, store_chat_sync_offset};
11use super::types::ChatSyncUiEvent;
12
13pub async fn run_chat_sync_worker(
15 tx: tokio::sync::mpsc::Sender<ChatSyncUiEvent>,
16 archive_path: PathBuf,
17 config: ChatSyncConfig,
18) {
19 let cp = chat_sync_checkpoint_path(&archive_path, &config);
20 let mut offset = load_chat_sync_offset(&cp);
21 let host =
22 sanitize_s3_key_segment(&std::env::var("HOSTNAME").unwrap_or_else(|_| "localhost".into()));
23 let _ = tx
24 .send(ChatSyncUiEvent::Status(format!(
25 "Archive sync → {} / {} every {}s",
26 config.endpoint, config.bucket, config.interval_secs,
27 )))
28 .await;
29 let mut iv = tokio::time::interval(Duration::from_secs(config.interval_secs));
30 iv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
31 loop {
32 iv.tick().await;
33 let client = match build_minio_client(&config.endpoint, &config) {
34 Ok(c) => c,
35 Err(e) => {
36 let _ = tx
37 .send(ChatSyncUiEvent::Error(format!("client init: {e}")))
38 .await;
39 continue;
40 }
41 };
42 match sync_batch(&client, &archive_path, &config, &host, offset).await {
43 Ok(Some(b)) => {
44 offset = b.next_offset;
45 store_chat_sync_offset(&cp, offset);
46 let _ = tx
47 .send(ChatSyncUiEvent::BatchUploaded {
48 bytes: b.bytes,
49 records: b.records,
50 object_key: b.object_key,
51 })
52 .await;
53 }
54 Ok(None) => {}
55 Err(e) => {
56 let _ = tx.send(ChatSyncUiEvent::Error(format!("sync: {e}"))).await;
57 }
58 }
59 }
60}