Skip to main content

codetether_agent/bus/
s3_sink.rs

1//! MinIO S3 Bus Sink
2//!
3//! Subscribes to the AgentBus and uploads all messages to MinIO S3
4//! as structured JSONL records suitable for LLM pretraining.
5//!
6//! Each bus message is transformed into a training-friendly record with:
7//! - Clear role attribution (system/user/assistant/tool)
8//! - Paired tool_call → tool_result sequences
9//! - Full, untruncated content (code, file paths, arguments, outputs)
10//! - Rich metadata for filtering, deduplication, and curriculum design
11//!
12//! The output format follows the OpenAI chat-completions schema so it can
13//! be fed directly into fine-tuning pipelines (SFT, DPO, RLHF).
14//!
15//! ## Usage
16//!
17//! ```rust,ignore
18//! use codetether_agent::bus::s3_sink::BusS3Sink;
19//!
20//! let sink = BusS3Sink::new(
21//!     bus.clone(),
22//!     "http://localhost:9000",
23//!     "access-key",
24//!     "secret-key",
25//!     "codetether-training",
26//!     "bus/",
27//! ).await?;
28//!
29//! // Runs forever, uploading bus messages to S3
30//! sink.run().await;
31//! ```
32
33use super::{AgentBus, BusEnvelope, BusMessage};
34use crate::a2a::types::Part;
35use crate::secrets;
36use anyhow::{Context, Result};
37use chrono::Utc;
38use minio::s3::builders::ObjectContent;
39use minio::s3::creds::StaticProvider;
40use minio::s3::http::BaseUrl;
41use minio::s3::types::S3Api;
42use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
43use serde::{Deserialize, Serialize};
44use std::sync::Arc;
45use tokio::sync::broadcast;
46use tokio::task;
47use tracing::{debug, error, info, warn};
48
49/// Configuration for the bus S3 sink
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct BusS3SinkConfig {
52    /// MinIO/S3 endpoint URL (e.g., "http://localhost:9000")
53    pub endpoint: String,
54    /// Access key
55    pub access_key: String,
56    /// Secret key
57    pub secret_key: String,
58    /// Bucket name for bus logs
59    pub bucket: String,
60    /// Path prefix within the bucket
61    #[serde(default = "default_prefix")]
62    pub prefix: String,
63    /// Batch size before flushing to S3
64    #[serde(default = "default_batch_size")]
65    pub batch_size: usize,
66    /// Max batch age in seconds before flushing
67    #[serde(default = "default_flush_interval_secs")]
68    pub flush_interval_secs: u64,
69    /// Whether to use SSL/TLS
70    #[serde(default)]
71    pub secure: bool,
72    /// Whether to ignore certificate errors (for self-signed certs)
73    #[serde(default)]
74    pub ignore_cert: bool,
75}
76
77fn default_prefix() -> String {
78    "training/".to_string()
79}
80
81fn default_batch_size() -> usize {
82    100
83}
84
85fn default_flush_interval_secs() -> u64 {
86    30
87}
88
89impl BusS3SinkConfig {
90    /// Create config from environment variables
91    ///
92    /// Required:
93    /// - `MINIO_ENDPOINT` or `CODETETHER_BUS_S3_ENDPOINT`
94    /// - `MINIO_ACCESS_KEY` or `CODETETHER_BUS_S3_ACCESS_KEY`
95    /// - `MINIO_SECRET_KEY` or `CODETETHER_BUS_S3_SECRET_KEY`
96    ///
97    /// Optional:
98    /// - `CODETETHER_BUS_S3_BUCKET` (default: "codetether-training")
99    /// - `CODETETHER_BUS_S3_PREFIX` (default: "bus/")
100    pub fn from_env() -> Result<Self> {
101        let endpoint = std::env::var("MINIO_ENDPOINT")
102            .or_else(|_| std::env::var("CODETETHER_BUS_S3_ENDPOINT"))
103            .context("MINIO_ENDPOINT or CODETETHER_BUS_S3_ENDPOINT required for bus S3 sink")?;
104
105        let access_key = std::env::var("MINIO_ACCESS_KEY")
106            .or_else(|_| std::env::var("CODETETHER_BUS_S3_ACCESS_KEY"))
107            .context("MINIO_ACCESS_KEY or CODETETHER_BUS_S3_ACCESS_KEY required")?;
108
109        let secret_key = std::env::var("MINIO_SECRET_KEY")
110            .or_else(|_| std::env::var("CODETETHER_BUS_S3_SECRET_KEY"))
111            .context("MINIO_SECRET_KEY or CODETETHER_BUS_S3_SECRET_KEY required")?;
112
113        Ok(Self {
114            endpoint,
115            access_key,
116            secret_key,
117            bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
118                .unwrap_or_else(|_| "codetether-training".to_string()),
119            prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
120                .unwrap_or_else(|_| "training/".to_string()),
121            batch_size: std::env::var("CODETETHER_BUS_S3_BATCH_SIZE")
122                .ok()
123                .and_then(|s| s.parse().ok())
124                .unwrap_or(100),
125            flush_interval_secs: std::env::var("CODETETHER_BUS_S3_FLUSH_SECS")
126                .ok()
127                .and_then(|s| s.parse().ok())
128                .unwrap_or(30),
129            secure: std::env::var("MINIO_SECURE")
130                .ok()
131                .map(|s| s.to_lowercase() == "true")
132                .unwrap_or(false),
133            ignore_cert: std::env::var("MINIO_IGNORE_CERT")
134                .ok()
135                .map(|s| s.to_lowercase() == "true")
136                .unwrap_or(false),
137        })
138    }
139
140    /// Create config by trying multiple credential sources in order:
141    ///
142    /// 1. Bus-specific env vars (`MINIO_ENDPOINT`, `CODETETHER_BUS_S3_ENDPOINT`)
143    /// 2. Chat-sync env vars (`CODETETHER_CHAT_SYNC_MINIO_ENDPOINT`)
144    /// 3. Vault secrets at `secret/codetether/providers/chat-sync-minio`
145    pub async fn from_env_or_vault() -> Result<Self> {
146        // Fast path: original env-only method
147        if let Ok(cfg) = Self::from_env() {
148            return Ok(cfg);
149        }
150
151        // Try chat-sync env vars
152        let endpoint = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ENDPOINT");
153        let access_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ACCESS_KEY");
154        let secret_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_SECRET_KEY");
155
156        if let (Some(ep), Some(ak), Some(sk)) =
157            (endpoint.clone(), access_key.clone(), secret_key.clone())
158        {
159            info!("Bus S3 sink using chat-sync env vars");
160            return Ok(Self {
161                endpoint: ep,
162                access_key: ak,
163                secret_key: sk,
164                bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
165                    .unwrap_or_else(|_| "codetether-training".to_string()),
166                prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
167                    .unwrap_or_else(|_| "training/".to_string()),
168                batch_size: 100,
169                flush_interval_secs: 30,
170                secure: false,
171                ignore_cert: false,
172            });
173        }
174
175        // Try Vault: chat-sync-minio provider
176        if let Some(secrets) = secrets::get_provider_secrets("chat-sync-minio").await {
177            let ep = secrets
178                .base_url
179                .clone()
180                .or_else(|| vault_extra_str(&secrets, &["endpoint", "minio_endpoint", "url"]))
181                .filter(|s| !s.is_empty());
182            let ak = vault_extra_str(
183                &secrets,
184                &["access_key", "access_key_id", "minio_access_key"],
185            )
186            .or_else(|| secrets.api_key.clone())
187            .filter(|s| !s.is_empty());
188            let sk = vault_extra_str(
189                &secrets,
190                &["secret_key", "secret_access_key", "minio_secret_key"],
191            )
192            .filter(|s| !s.is_empty());
193
194            if let (Some(ep), Some(ak), Some(sk)) = (ep, ak, sk) {
195                info!("Bus S3 sink using Vault chat-sync-minio credentials");
196                return Ok(Self {
197                    endpoint: ep,
198                    access_key: ak,
199                    secret_key: sk,
200                    bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
201                        .unwrap_or_else(|_| "codetether-training".to_string()),
202                    prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
203                        .unwrap_or_else(|_| "training/".to_string()),
204                    batch_size: 100,
205                    flush_interval_secs: 30,
206                    secure: false,
207                    ignore_cert: false,
208                });
209            }
210        }
211
212        anyhow::bail!(
213            "No MinIO credentials found. Set MINIO_ENDPOINT/MINIO_ACCESS_KEY/MINIO_SECRET_KEY, \
214             CODETETHER_CHAT_SYNC_MINIO_* env vars, or configure chat-sync-minio in Vault."
215        )
216    }
217}
218
219/// Read an env var, returning `None` if unset or empty.
220fn env_non_empty(key: &str) -> Option<String> {
221    std::env::var(key).ok().filter(|s| !s.is_empty())
222}
223
224/// Extract a string value from `ProviderSecrets.extra`, trying multiple key names.
225fn vault_extra_str(secrets: &secrets::ProviderSecrets, keys: &[&str]) -> Option<String> {
226    for key in keys {
227        if let Some(val) = secrets.extra.get(*key) {
228            if let Some(s) = val.as_str() {
229                if !s.is_empty() {
230                    return Some(s.to_string());
231                }
232            }
233        }
234    }
235    None
236}
237
238// ─── LLM Pretraining Record ─────────────────────────────────────────────
239
240/// A single training record in OpenAI chat-completions format.
241///
242/// Each bus envelope maps to one `TrainingRecord` line in the JSONL output.
243/// The schema is compatible with OpenAI fine-tuning, Axolotl, and similar
244/// pipelines so the data can be used directly for SFT / DPO / RLHF.
245#[derive(Debug, Clone, Serialize, Deserialize)]
246struct TrainingRecord {
247    /// "system" | "user" | "assistant" | "tool"
248    role: String,
249    /// Primary text content
250    #[serde(skip_serializing_if = "Option::is_none")]
251    content: Option<String>,
252    /// Tool calls made by the assistant (only when role == "assistant")
253    #[serde(skip_serializing_if = "Option::is_none")]
254    tool_calls: Option<Vec<TrainingToolCall>>,
255    /// Tool call id this result corresponds to (only when role == "tool")
256    #[serde(skip_serializing_if = "Option::is_none")]
257    tool_call_id: Option<String>,
258    /// Tool name (only when role == "tool")
259    #[serde(skip_serializing_if = "Option::is_none")]
260    name: Option<String>,
261    /// Envelope metadata for filtering and curriculum design
262    metadata: TrainingMetadata,
263}
264
265/// Represents a tool call in the assistant's response.
266#[derive(Debug, Clone, Serialize, Deserialize)]
267struct TrainingToolCall {
268    /// Unique id for this tool call
269    id: String,
270    /// Always "function"
271    #[serde(rename = "type")]
272    call_type: String,
273    /// The function being called
274    function: TrainingFunction,
275}
276
277/// Function name + arguments inside a tool call.
278#[derive(Debug, Clone, Serialize, Deserialize)]
279struct TrainingFunction {
280    name: String,
281    arguments: String,
282}
283
284/// Provenance metadata attached to every training record.
285#[derive(Debug, Clone, Serialize, Deserialize)]
286struct TrainingMetadata {
287    /// Original BusMessage variant name (snake_case)
288    bus_kind: String,
289    /// Envelope id
290    envelope_id: String,
291    /// ISO 8601 timestamp
292    timestamp: String,
293    /// Hierarchical topic
294    topic: String,
295    /// Agent that originated the message
296    sender_id: String,
297    /// Correlation id linking request ↔ response
298    #[serde(skip_serializing_if = "Option::is_none")]
299    correlation_id: Option<String>,
300}
301
302/// Convert a `BusEnvelope` into a `TrainingRecord`.
303fn envelope_to_training_record(env: &BusEnvelope) -> TrainingRecord {
304    let meta = TrainingMetadata {
305        bus_kind: bus_message_kind(&env.message),
306        envelope_id: env.id.clone(),
307        timestamp: env.timestamp.to_rfc3339(),
308        topic: env.topic.clone(),
309        sender_id: env.sender_id.clone(),
310        correlation_id: env.correlation_id.clone(),
311    };
312
313    match &env.message {
314        // ── Agent lifecycle → system ────────────────────────────────
315        BusMessage::AgentReady {
316            agent_id,
317            capabilities,
318        } => TrainingRecord {
319            role: "system".into(),
320            content: Some(format!(
321                "Agent `{agent_id}` ready. Capabilities: {}",
322                capabilities.join(", ")
323            )),
324            tool_calls: None,
325            tool_call_id: None,
326            name: None,
327            metadata: meta,
328        },
329
330        BusMessage::AgentShutdown { agent_id } => TrainingRecord {
331            role: "system".into(),
332            content: Some(format!("Agent `{agent_id}` shutting down.")),
333            tool_calls: None,
334            tool_call_id: None,
335            name: None,
336            metadata: meta,
337        },
338
339        // ── Agent messages → assistant ──────────────────────────────
340        BusMessage::AgentMessage { from, to, parts } => {
341            let text = parts_to_text(parts);
342            TrainingRecord {
343                role: "assistant".into(),
344                content: Some(format!("[{from} → {to}] {text}")),
345                tool_calls: None,
346                tool_call_id: None,
347                name: None,
348                metadata: meta,
349            }
350        }
351
352        // ── Task/artifact lifecycle → system ────────────────────────
353        BusMessage::TaskUpdate {
354            task_id,
355            state,
356            message,
357        } => {
358            let msg = message.as_deref().unwrap_or("");
359            TrainingRecord {
360                role: "system".into(),
361                content: Some(format!("Task `{task_id}` → {state:?}. {msg}")),
362                tool_calls: None,
363                tool_call_id: None,
364                name: None,
365                metadata: meta,
366            }
367        }
368
369        BusMessage::ArtifactUpdate { task_id, artifact } => {
370            let artifact_text = parts_to_text(&artifact.parts);
371            TrainingRecord {
372                role: "system".into(),
373                content: Some(format!(
374                    "Task `{task_id}` artifact `{}`: {artifact_text}",
375                    artifact.artifact_id
376                )),
377                tool_calls: None,
378                tool_call_id: None,
379                name: None,
380                metadata: meta,
381            }
382        }
383
384        // ── Shared results → system ────────────────────────────────
385        BusMessage::SharedResult { key, value, tags } => TrainingRecord {
386            role: "system".into(),
387            content: Some(format!(
388                "Shared result `{key}` [{}]: {}",
389                tags.join(", "),
390                serde_json::to_string(value).unwrap_or_default()
391            )),
392            tool_calls: None,
393            tool_call_id: None,
394            name: None,
395            metadata: meta,
396        },
397
398        // ── Tool request → assistant with tool_calls ────────────────
399        BusMessage::ToolRequest {
400            request_id,
401            tool_name,
402            arguments,
403            ..
404        } => TrainingRecord {
405            role: "assistant".into(),
406            content: None,
407            tool_calls: Some(vec![TrainingToolCall {
408                id: request_id.clone(),
409                call_type: "function".into(),
410                function: TrainingFunction {
411                    name: tool_name.clone(),
412                    arguments: serde_json::to_string(arguments).unwrap_or_default(),
413                },
414            }]),
415            tool_call_id: None,
416            name: None,
417            metadata: meta,
418        },
419
420        // ── Tool response → tool role ───────────────────────────────
421        BusMessage::ToolResponse {
422            request_id,
423            tool_name,
424            result,
425            success,
426            ..
427        } => TrainingRecord {
428            role: "tool".into(),
429            content: Some(if *success {
430                result.clone()
431            } else {
432                format!("[ERROR] {result}")
433            }),
434            tool_calls: None,
435            tool_call_id: Some(request_id.clone()),
436            name: Some(tool_name.clone()),
437            metadata: meta,
438        },
439
440        // ── Full tool output → tool role (untruncated) ──────────────
441        BusMessage::ToolOutputFull {
442            agent_id,
443            tool_name,
444            output,
445            success,
446            step,
447        } => TrainingRecord {
448            role: "tool".into(),
449            content: Some(if *success {
450                format!("[step {step}, agent {agent_id}] {output}")
451            } else {
452                format!("[step {step}, agent {agent_id}, ERROR] {output}")
453            }),
454            tool_calls: None,
455            tool_call_id: None,
456            name: Some(tool_name.clone()),
457            metadata: meta,
458        },
459
460        // ── Heartbeat → system (filtered out during flush) ──────────
461        BusMessage::Heartbeat { .. } => TrainingRecord {
462            role: "system".into(),
463            content: None,
464            tool_calls: None,
465            tool_call_id: None,
466            name: None,
467            metadata: meta,
468        },
469
470        // ── Ralph learnings → system context ────────────────────────
471        BusMessage::RalphLearning {
472            prd_id,
473            story_id,
474            iteration,
475            learnings,
476            context,
477        } => TrainingRecord {
478            role: "system".into(),
479            content: Some(format!(
480                "Ralph learning (PRD {prd_id}, story {story_id}, iter {iteration}):\n{}\nContext: {}",
481                learnings
482                    .iter()
483                    .map(|l| format!("- {l}"))
484                    .collect::<Vec<_>>()
485                    .join("\n"),
486                serde_json::to_string(context).unwrap_or_default()
487            )),
488            tool_calls: None,
489            tool_call_id: None,
490            name: None,
491            metadata: meta,
492        },
493
494        BusMessage::RalphHandoff {
495            prd_id,
496            from_story,
497            to_story,
498            context,
499            progress_summary,
500        } => TrainingRecord {
501            role: "system".into(),
502            content: Some(format!(
503                "Ralph handoff (PRD {prd_id}): {from_story} → {to_story}\nSummary: {progress_summary}\nContext: {}",
504                serde_json::to_string(context).unwrap_or_default()
505            )),
506            tool_calls: None,
507            tool_call_id: None,
508            name: None,
509            metadata: meta,
510        },
511
512        BusMessage::RalphProgress {
513            prd_id,
514            passed,
515            total,
516            iteration,
517            status,
518        } => TrainingRecord {
519            role: "system".into(),
520            content: Some(format!(
521                "Ralph progress (PRD {prd_id}): {passed}/{total} stories passed, iter {iteration}, status: {status}"
522            )),
523            tool_calls: None,
524            tool_call_id: None,
525            name: None,
526            metadata: meta,
527        },
528
529        // ── Agent thinking → assistant reasoning ────────────────────
530        BusMessage::AgentThinking {
531            agent_id,
532            thinking,
533            step,
534        } => TrainingRecord {
535            role: "assistant".into(),
536            content: Some(format!("<thinking>\n{thinking}\n</thinking>")),
537            tool_calls: None,
538            tool_call_id: None,
539            name: Some(format!("reasoning.{agent_id}.step_{step}")),
540            metadata: meta,
541        },
542
543        // ── Voice session lifecycle → system ────────────────────────
544        BusMessage::VoiceSessionStarted {
545            room_name,
546            agent_id,
547            voice_id,
548        } => TrainingRecord {
549            role: "system".into(),
550            content: Some(format!(
551                "Voice session started: room={room_name}, agent={agent_id}, voice={voice_id}"
552            )),
553            tool_calls: None,
554            tool_call_id: None,
555            name: None,
556            metadata: meta,
557        },
558
559        BusMessage::VoiceTranscript {
560            room_name,
561            text,
562            role,
563            is_final,
564        } => TrainingRecord {
565            role: if role == "user" {
566                "user".into()
567            } else {
568                "assistant".into()
569            },
570            content: Some(format!(
571                "[voice:{room_name}{}] {text}",
572                if *is_final { " final" } else { "" }
573            )),
574            tool_calls: None,
575            tool_call_id: None,
576            name: None,
577            metadata: meta,
578        },
579
580        BusMessage::VoiceAgentStateChanged { room_name, state } => TrainingRecord {
581            role: "system".into(),
582            content: Some(format!("Voice agent state: room={room_name} → {state}")),
583            tool_calls: None,
584            tool_call_id: None,
585            name: None,
586            metadata: meta,
587        },
588
589        BusMessage::VoiceSessionEnded { room_name, reason } => TrainingRecord {
590            role: "system".into(),
591            content: Some(format!(
592                "Voice session ended: room={room_name}, reason={reason}"
593            )),
594            tool_calls: None,
595            tool_call_id: None,
596            name: None,
597            metadata: meta,
598        },
599    }
600}
601
602/// Extract the serde tag name from a `BusMessage` variant.
603fn bus_message_kind(msg: &BusMessage) -> String {
604    serde_json::to_value(msg)
605        .ok()
606        .and_then(|v| v.get("kind").and_then(|k| k.as_str()).map(String::from))
607        .unwrap_or_else(|| "unknown".into())
608}
609
610/// Concatenate `Part` items into a single text string.
611fn parts_to_text(parts: &[Part]) -> String {
612    parts
613        .iter()
614        .map(|p| match p {
615            Part::Text { text } => text.as_str(),
616            Part::Data { .. } => "<<data>>",
617            Part::File { .. } => "<<file>>",
618        })
619        .collect::<Vec<_>>()
620        .join("\n")
621}
622
623// ─── S3 Sink ─────────────────────────────────────────────────────────────
624
625/// S3 sink that archives all bus messages as JSONL training records
626pub struct BusS3Sink {
627    #[allow(dead_code)]
628    bus: Arc<AgentBus>,
629    client: MinioClient,
630    config: BusS3SinkConfig,
631    rx: broadcast::Receiver<BusEnvelope>,
632}
633
634impl BusS3Sink {
635    /// Create a new bus S3 sink
636    pub async fn new(
637        bus: Arc<AgentBus>,
638        endpoint: &str,
639        access_key: &str,
640        secret_key: &str,
641        bucket: &str,
642        prefix: &str,
643    ) -> Result<Self> {
644        let config = BusS3SinkConfig {
645            endpoint: endpoint.to_string(),
646            access_key: access_key.to_string(),
647            secret_key: secret_key.to_string(),
648            bucket: bucket.to_string(),
649            prefix: prefix.to_string(),
650            batch_size: 100,
651            flush_interval_secs: 30,
652            secure: endpoint.starts_with("https"),
653            ignore_cert: false,
654        };
655
656        Self::from_config(bus, config).await
657    }
658
659    /// Create sink from configuration
660    pub async fn from_config(bus: Arc<AgentBus>, config: BusS3SinkConfig) -> Result<Self> {
661        let endpoint = normalize_endpoint(&config.endpoint, config.secure);
662
663        let base_url: BaseUrl = endpoint.parse().context("Invalid MinIO endpoint URL")?;
664
665        let static_provider = StaticProvider::new(&config.access_key, &config.secret_key, None);
666
667        let client = MinioClientBuilder::new(base_url)
668            .provider(Some(Box::new(static_provider)))
669            .ignore_cert_check(Some(config.ignore_cert))
670            .build()?;
671
672        let rx = bus.tx.subscribe();
673
674        Ok(Self {
675            bus,
676            client,
677            config,
678            rx,
679        })
680    }
681
682    /// Create sink from environment variables
683    pub async fn from_env(bus: Arc<AgentBus>) -> Result<Self> {
684        let config = BusS3SinkConfig::from_env()?;
685        Self::from_config(bus, config).await
686    }
687
688    /// Ensure the bucket exists, creating it if necessary
689    pub async fn ensure_bucket(&self) -> Result<()> {
690        match self.client.bucket_exists(&self.config.bucket).send().await {
691            Ok(resp) if resp.exists => {
692                debug!(bucket = %self.config.bucket, "S3 bucket exists");
693            }
694            Ok(_) => {
695                info!(bucket = %self.config.bucket, "Creating S3 bucket");
696                match self.client.create_bucket(&self.config.bucket).send().await {
697                    Ok(_) => {}
698                    Err(e) => {
699                        let err_text = e.to_string();
700                        if !err_text.contains("BucketAlreadyOwnedByYou")
701                            && !err_text.contains("BucketAlreadyExists")
702                        {
703                            return Err(anyhow::anyhow!("Failed to create bucket: {err_text}"));
704                        }
705                        debug!(bucket = %self.config.bucket, "Bucket already exists");
706                    }
707                }
708            }
709            Err(e) => {
710                debug!(error = %e, bucket = %self.config.bucket, "Bucket check returned error (may already exist)");
711            }
712        }
713        Ok(())
714    }
715
716    /// Run the sink loop - subscribes to bus and uploads batches to S3
717    pub async fn run(mut self) -> Result<()> {
718        self.ensure_bucket().await?;
719
720        info!(
721            bucket = %self.config.bucket,
722            prefix = %self.config.prefix,
723            batch_size = self.config.batch_size,
724            flush_secs = self.config.flush_interval_secs,
725            "Bus S3 sink started (JSONL training record format)"
726        );
727
728        let mut batch: Vec<BusEnvelope> = Vec::with_capacity(self.config.batch_size);
729        let mut batch_start: Option<String> = None;
730        let mut flush_interval = tokio::time::interval(std::time::Duration::from_secs(
731            self.config.flush_interval_secs,
732        ));
733
734        loop {
735            tokio::select! {
736                result = self.rx.recv() => {
737                    match result {
738                        Ok(envelope) => {
739                            if batch_start.is_none() {
740                                batch_start = Some(envelope.timestamp.to_rfc3339());
741                            }
742                            batch.push(envelope);
743
744                            if batch.len() >= self.config.batch_size {
745                                if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
746                                    error!(error = %e, "Failed to flush batch");
747                                }
748                                // Yield to allow LLM requests priority access to executor
749                                task::yield_now().await;
750                            }
751                        }
752                        Err(broadcast::error::RecvError::Lagged(n)) => {
753                            warn!(skipped = n, "Bus S3 sink lagged, some messages dropped");
754                        }
755                        Err(broadcast::error::RecvError::Closed) => {
756                            info!("Bus channel closed, shutting down S3 sink");
757                            if !batch.is_empty() {
758                                if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
759                                    error!(error = %e, "Failed to flush final batch");
760                                }
761                            }
762                            return Ok(());
763                        }
764                    }
765                }
766
767                _ = flush_interval.tick() => {
768                    if !batch.is_empty() {
769                        if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
770                            error!(error = %e, "Failed to flush batch on interval");
771                        }
772                        // Yield to allow LLM requests priority access to executor
773                        task::yield_now().await;
774                    }
775                }
776            }
777        }
778    }
779
780    /// Flush the current batch to S3 as JSONL (one training record per line).
781    ///
782    /// Heartbeats are filtered out as noise. Every other envelope is
783    /// transformed into a `TrainingRecord` and serialized as a single JSON
784    /// line, making the file directly ingestible by LLM fine-tuning tools.
785    async fn flush_batch(
786        &self,
787        batch: &mut Vec<BusEnvelope>,
788        batch_start: &mut Option<String>,
789    ) -> Result<()> {
790        if batch.is_empty() {
791            return Ok(());
792        }
793
794        let _start_time = batch_start
795            .take()
796            .unwrap_or_else(|| Utc::now().to_rfc3339());
797        let envelopes = std::mem::take(batch);
798
799        // Build JSONL: one training record per line, skip heartbeats
800        let mut lines = Vec::with_capacity(envelopes.len());
801        for env in &envelopes {
802            if matches!(env.message, BusMessage::Heartbeat { .. }) {
803                continue;
804            }
805            let record = envelope_to_training_record(env);
806            if let Ok(line) = serde_json::to_string(&record) {
807                lines.push(line);
808            }
809        }
810
811        if lines.is_empty() {
812            return Ok(());
813        }
814
815        let count = lines.len();
816        let jsonl = lines.join("\n");
817
818        // S3 key: prefix/YYYY/MM/DD/HH/batch_YYYYMMDDTHHMMSS_uuid.jsonl
819        let now = Utc::now();
820        let date_path = now.format("%Y/%m/%d/%H").to_string();
821        let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
822        let uuid = uuid::Uuid::new_v4();
823        let s3_key = format!(
824            "{}{}/batch_{}_{}.jsonl",
825            self.config.prefix, date_path, timestamp, uuid
826        );
827
828        let content = ObjectContent::from(jsonl.into_bytes());
829
830        match self
831            .client
832            .put_object_content(&self.config.bucket, &s3_key, content)
833            .send()
834            .await
835        {
836            Ok(_) => {
837                info!(
838                    bucket = %self.config.bucket,
839                    key = %s3_key,
840                    records = count,
841                    "Uploaded training records to S3"
842                );
843            }
844            Err(e) => {
845                error!(
846                    bucket = %self.config.bucket,
847                    key = %s3_key,
848                    error = %e,
849                    "Failed to upload training records to S3"
850                );
851                return Err(anyhow::anyhow!("S3 upload failed: {e}"));
852            }
853        }
854
855        Ok(())
856    }
857
858    /// Get the bucket name
859    pub fn bucket(&self) -> &str {
860        &self.config.bucket
861    }
862
863    /// Get the prefix
864    pub fn prefix(&self) -> &str {
865        &self.config.prefix
866    }
867}
868
869/// Normalize endpoint URL (ensure protocol, remove trailing slash)
870fn normalize_endpoint(endpoint: &str, secure: bool) -> String {
871    let endpoint = endpoint.trim_end_matches('/');
872
873    if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
874        endpoint.to_string()
875    } else if secure {
876        format!("https://{endpoint}")
877    } else {
878        format!("http://{endpoint}")
879    }
880}
881
882/// Spawn the bus S3 sink in a background task.
883///
884/// The S3 sink runs non-blocking in its own task, processing batches
885/// of training records. It yields periodically to ensure LLM requests
886/// have priority for CPU/network resources.
887///
888/// Errors are logged but do not crash the application.
889pub fn spawn_bus_s3_sink(bus: Arc<AgentBus>) -> tokio::task::JoinHandle<()> {
890    tokio::spawn(async move {
891        match BusS3SinkConfig::from_env_or_vault().await {
892            Ok(config) => match BusS3Sink::from_config(bus, config).await {
893                Ok(sink) => {
894                    if let Err(e) = sink.run().await {
895                        error!(error = %e, "Bus S3 sink failed");
896                    }
897                }
898                Err(e) => {
899                    error!(error = %e, "Bus S3 sink failed to initialize");
900                }
901            },
902            Err(e) => {
903                warn!(
904                    error = %e,
905                    "Bus S3 sink not configured - set MINIO_*/CODETETHER_CHAT_SYNC_MINIO_* env vars or configure chat-sync-minio in Vault"
906                );
907            }
908        }
909    })
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915
916    #[test]
917    fn test_normalize_endpoint() {
918        assert_eq!(
919            normalize_endpoint("localhost:9000", false),
920            "http://localhost:9000"
921        );
922        assert_eq!(
923            normalize_endpoint("localhost:9000", true),
924            "https://localhost:9000"
925        );
926        assert_eq!(
927            normalize_endpoint("http://localhost:9000/", false),
928            "http://localhost:9000"
929        );
930        assert_eq!(
931            normalize_endpoint("https://minio.example.com/", true),
932            "https://minio.example.com"
933        );
934    }
935
936    #[test]
937    fn test_config_defaults() {
938        let config = BusS3SinkConfig {
939            endpoint: "http://localhost:9000".to_string(),
940            access_key: "key".to_string(),
941            secret_key: "secret".to_string(),
942            bucket: "test".to_string(),
943            prefix: default_prefix(),
944            batch_size: default_batch_size(),
945            flush_interval_secs: default_flush_interval_secs(),
946            secure: false,
947            ignore_cert: false,
948        };
949
950        assert_eq!(config.prefix, "training/");
951        assert_eq!(config.batch_size, 100);
952        assert_eq!(config.flush_interval_secs, 30);
953    }
954
955    #[test]
956    fn test_training_record_tool_request() {
957        let env = BusEnvelope {
958            id: "env-1".into(),
959            topic: "tools.read_file".into(),
960            sender_id: "agent-0".into(),
961            correlation_id: Some("corr-1".into()),
962            timestamp: Utc::now(),
963            message: BusMessage::ToolRequest {
964                request_id: "req-1".into(),
965                agent_id: "agent-0".into(),
966                tool_name: "read_file".into(),
967                arguments: serde_json::json!({"path": "/src/main.rs"}),
968            },
969        };
970
971        let record = envelope_to_training_record(&env);
972        assert_eq!(record.role, "assistant");
973        assert!(record.content.is_none());
974        let calls = record.tool_calls.unwrap();
975        assert_eq!(calls.len(), 1);
976        assert_eq!(calls[0].function.name, "read_file");
977        assert_eq!(calls[0].call_type, "function");
978        assert_eq!(record.metadata.bus_kind, "tool_request");
979    }
980
981    #[test]
982    fn test_training_record_tool_response() {
983        let env = BusEnvelope {
984            id: "env-2".into(),
985            topic: "tools.read_file".into(),
986            sender_id: "agent-0".into(),
987            correlation_id: Some("corr-1".into()),
988            timestamp: Utc::now(),
989            message: BusMessage::ToolResponse {
990                request_id: "req-1".into(),
991                agent_id: "agent-0".into(),
992                tool_name: "read_file".into(),
993                result: "fn main() {}".into(),
994                success: true,
995            },
996        };
997
998        let record = envelope_to_training_record(&env);
999        assert_eq!(record.role, "tool");
1000        assert_eq!(record.content.as_deref(), Some("fn main() {}"));
1001        assert_eq!(record.tool_call_id.as_deref(), Some("req-1"));
1002        assert_eq!(record.name.as_deref(), Some("read_file"));
1003        assert_eq!(record.metadata.bus_kind, "tool_response");
1004    }
1005
1006    #[test]
1007    fn test_training_record_agent_message() {
1008        let env = BusEnvelope {
1009            id: "env-3".into(),
1010            topic: "agent.planner".into(),
1011            sender_id: "coder".into(),
1012            correlation_id: None,
1013            timestamp: Utc::now(),
1014            message: BusMessage::AgentMessage {
1015                from: "coder".into(),
1016                to: "planner".into(),
1017                parts: vec![Part::Text {
1018                    text: "I fixed the bug".into(),
1019                }],
1020            },
1021        };
1022
1023        let record = envelope_to_training_record(&env);
1024        assert_eq!(record.role, "assistant");
1025        assert!(
1026            record
1027                .content
1028                .as_deref()
1029                .unwrap()
1030                .contains("I fixed the bug")
1031        );
1032        assert!(
1033            record
1034                .content
1035                .as_deref()
1036                .unwrap()
1037                .contains("[coder → planner]")
1038        );
1039    }
1040
1041    #[test]
1042    fn test_heartbeat_skipped_role() {
1043        let env = BusEnvelope {
1044            id: "env-4".into(),
1045            topic: "broadcast".into(),
1046            sender_id: "agent-0".into(),
1047            correlation_id: None,
1048            timestamp: Utc::now(),
1049            message: BusMessage::Heartbeat {
1050                agent_id: "agent-0".into(),
1051                status: "ok".into(),
1052            },
1053        };
1054
1055        let record = envelope_to_training_record(&env);
1056        // Heartbeats produce a record but flush_batch filters them out
1057        assert_eq!(record.role, "system");
1058        assert!(record.content.is_none());
1059    }
1060}