Skip to main content

codetether_agent/bus/
s3_sink.rs

1//! MinIO S3 Bus Sink
2//!
3//! Subscribes to the AgentBus and uploads all messages to MinIO S3
4//! as structured JSONL records suitable for LLM pretraining.
5//!
6//! Each bus message is transformed into a training-friendly record with:
7//! - Clear role attribution (system/user/assistant/tool)
8//! - Paired tool_call → tool_result sequences
9//! - Full, untruncated content (code, file paths, arguments, outputs)
10//! - Rich metadata for filtering, deduplication, and curriculum design
11//!
12//! The output format follows the OpenAI chat-completions schema so it can
13//! be fed directly into fine-tuning pipelines (SFT, DPO, RLHF).
14//!
15//! ## Usage
16//!
17//! ```rust,ignore
18//! use codetether_agent::bus::s3_sink::BusS3Sink;
19//!
20//! let sink = BusS3Sink::new(
21//!     bus.clone(),
22//!     "http://localhost:9000",
23//!     "access-key",
24//!     "secret-key",
25//!     "codetether-training",
26//!     "bus/",
27//! ).await?;
28//!
29//! // Runs forever, uploading bus messages to S3
30//! sink.run().await;
31//! ```
32
33use super::{AgentBus, BusEnvelope, BusMessage};
34use crate::a2a::types::Part;
35use crate::secrets;
36use anyhow::{Context, Result};
37use chrono::Utc;
38use minio::s3::builders::ObjectContent;
39use minio::s3::creds::StaticProvider;
40use minio::s3::http::BaseUrl;
41use minio::s3::types::S3Api;
42use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
43use serde::{Deserialize, Serialize};
44use std::collections::BTreeMap;
45use std::sync::Arc;
46use tokio::sync::broadcast;
47use tokio::task;
48use tracing::{debug, error, info, warn};
49
50/// Configuration for the bus S3 sink
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BusS3SinkConfig {
53    /// MinIO/S3 endpoint URL (e.g., "http://localhost:9000")
54    pub endpoint: String,
55    /// Access key
56    pub access_key: String,
57    /// Secret key
58    pub secret_key: String,
59    /// Bucket name for bus logs
60    pub bucket: String,
61    /// Path prefix within the bucket
62    #[serde(default = "default_prefix")]
63    pub prefix: String,
64    /// Batch size before flushing to S3
65    #[serde(default = "default_batch_size")]
66    pub batch_size: usize,
67    /// Max batch age in seconds before flushing
68    #[serde(default = "default_flush_interval_secs")]
69    pub flush_interval_secs: u64,
70    /// Whether to use SSL/TLS
71    #[serde(default)]
72    pub secure: bool,
73    /// Whether to ignore certificate errors (for self-signed certs)
74    #[serde(default)]
75    pub ignore_cert: bool,
76}
77
78fn default_prefix() -> String {
79    "training/".to_string()
80}
81
82fn default_batch_size() -> usize {
83    100
84}
85
86fn default_flush_interval_secs() -> u64 {
87    30
88}
89
90impl BusS3SinkConfig {
91    /// Create config from environment variables
92    ///
93    /// Required:
94    /// - `MINIO_ENDPOINT` or `CODETETHER_BUS_S3_ENDPOINT`
95    /// - `MINIO_ACCESS_KEY` or `CODETETHER_BUS_S3_ACCESS_KEY`
96    /// - `MINIO_SECRET_KEY` or `CODETETHER_BUS_S3_SECRET_KEY`
97    ///
98    /// Optional:
99    /// - `CODETETHER_BUS_S3_BUCKET` (default: "codetether-training")
100    /// - `CODETETHER_BUS_S3_PREFIX` (default: "bus/")
101    pub fn from_env() -> Result<Self> {
102        let endpoint = std::env::var("MINIO_ENDPOINT")
103            .or_else(|_| std::env::var("CODETETHER_BUS_S3_ENDPOINT"))
104            .context("MINIO_ENDPOINT or CODETETHER_BUS_S3_ENDPOINT required for bus S3 sink")?;
105
106        let access_key = std::env::var("MINIO_ACCESS_KEY")
107            .or_else(|_| std::env::var("CODETETHER_BUS_S3_ACCESS_KEY"))
108            .context("MINIO_ACCESS_KEY or CODETETHER_BUS_S3_ACCESS_KEY required")?;
109
110        let secret_key = std::env::var("MINIO_SECRET_KEY")
111            .or_else(|_| std::env::var("CODETETHER_BUS_S3_SECRET_KEY"))
112            .context("MINIO_SECRET_KEY or CODETETHER_BUS_S3_SECRET_KEY required")?;
113
114        Ok(Self {
115            endpoint,
116            access_key,
117            secret_key,
118            bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
119                .unwrap_or_else(|_| "codetether-training".to_string()),
120            prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
121                .unwrap_or_else(|_| "training/".to_string()),
122            batch_size: std::env::var("CODETETHER_BUS_S3_BATCH_SIZE")
123                .ok()
124                .and_then(|s| s.parse().ok())
125                .unwrap_or(100),
126            flush_interval_secs: std::env::var("CODETETHER_BUS_S3_FLUSH_SECS")
127                .ok()
128                .and_then(|s| s.parse().ok())
129                .unwrap_or(30),
130            secure: std::env::var("MINIO_SECURE")
131                .ok()
132                .map(|s| s.to_lowercase() == "true")
133                .unwrap_or(false),
134            ignore_cert: std::env::var("MINIO_IGNORE_CERT")
135                .ok()
136                .map(|s| s.to_lowercase() == "true")
137                .unwrap_or(false),
138        })
139    }
140
141    /// Create config by trying multiple credential sources in order:
142    ///
143    /// 1. Bus-specific env vars (`MINIO_ENDPOINT`, `CODETETHER_BUS_S3_ENDPOINT`)
144    /// 2. Chat-sync env vars (`CODETETHER_CHAT_SYNC_MINIO_ENDPOINT`)
145    /// 3. Vault secrets at `secret/codetether/providers/chat-sync-minio`
146    pub async fn from_env_or_vault() -> Result<Self> {
147        // Fast path: original env-only method
148        if let Ok(cfg) = Self::from_env() {
149            return Ok(cfg);
150        }
151
152        // Try chat-sync env vars
153        let endpoint = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ENDPOINT");
154        let access_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ACCESS_KEY");
155        let secret_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_SECRET_KEY");
156
157        if let (Some(ep), Some(ak), Some(sk)) =
158            (endpoint.clone(), access_key.clone(), secret_key.clone())
159        {
160            info!("Bus S3 sink using chat-sync env vars");
161            return Ok(Self {
162                endpoint: ep,
163                access_key: ak,
164                secret_key: sk,
165                bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
166                    .unwrap_or_else(|_| "codetether-training".to_string()),
167                prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
168                    .unwrap_or_else(|_| "training/".to_string()),
169                batch_size: 100,
170                flush_interval_secs: 30,
171                secure: false,
172                ignore_cert: false,
173            });
174        }
175
176        // Try Vault: chat-sync-minio provider
177        if let Some(secrets) = secrets::get_provider_secrets("chat-sync-minio").await {
178            let ep = secrets
179                .base_url
180                .clone()
181                .or_else(|| vault_extra_str(&secrets, &["endpoint", "minio_endpoint", "url"]))
182                .filter(|s| !s.is_empty());
183            let ak = vault_extra_str(
184                &secrets,
185                &["access_key", "access_key_id", "minio_access_key"],
186            )
187            .or_else(|| secrets.api_key.clone())
188            .filter(|s| !s.is_empty());
189            let sk = vault_extra_str(
190                &secrets,
191                &["secret_key", "secret_access_key", "minio_secret_key"],
192            )
193            .filter(|s| !s.is_empty());
194
195            if let (Some(ep), Some(ak), Some(sk)) = (ep, ak, sk) {
196                info!("Bus S3 sink using Vault chat-sync-minio credentials");
197                return Ok(Self {
198                    endpoint: ep,
199                    access_key: ak,
200                    secret_key: sk,
201                    bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
202                        .unwrap_or_else(|_| "codetether-training".to_string()),
203                    prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
204                        .unwrap_or_else(|_| "training/".to_string()),
205                    batch_size: 100,
206                    flush_interval_secs: 30,
207                    secure: false,
208                    ignore_cert: false,
209                });
210            }
211        }
212
213        anyhow::bail!(
214            "No MinIO credentials found. Set MINIO_ENDPOINT/MINIO_ACCESS_KEY/MINIO_SECRET_KEY, \
215             CODETETHER_CHAT_SYNC_MINIO_* env vars, or configure chat-sync-minio in Vault."
216        )
217    }
218}
219
220/// Read an env var, returning `None` if unset or empty.
221fn env_non_empty(key: &str) -> Option<String> {
222    std::env::var(key).ok().filter(|s| !s.is_empty())
223}
224
225/// Extract a string value from `ProviderSecrets.extra`, trying multiple key names.
226fn vault_extra_str(secrets: &secrets::ProviderSecrets, keys: &[&str]) -> Option<String> {
227    for key in keys {
228        if let Some(val) = secrets.extra.get(*key)
229            && let Some(s) = val.as_str()
230            && !s.is_empty()
231        {
232            return Some(s.to_string());
233        }
234    }
235    None
236}
237
238// ─── LLM Pretraining Record ─────────────────────────────────────────────
239
240/// A single training record in OpenAI chat-completions format.
241///
242/// Each bus envelope maps to one `TrainingRecord` line in the JSONL output.
243/// The schema is compatible with OpenAI fine-tuning, Axolotl, and similar
244/// pipelines so the data can be used directly for SFT / DPO / RLHF.
245#[derive(Debug, Clone, Serialize, Deserialize)]
246struct TrainingRecord {
247    /// "system" | "user" | "assistant" | "tool"
248    role: String,
249    /// Primary text content
250    #[serde(skip_serializing_if = "Option::is_none")]
251    content: Option<String>,
252    /// Tool calls made by the assistant (only when role == "assistant")
253    #[serde(skip_serializing_if = "Option::is_none")]
254    tool_calls: Option<Vec<TrainingToolCall>>,
255    /// Tool call id this result corresponds to (only when role == "tool")
256    #[serde(skip_serializing_if = "Option::is_none")]
257    tool_call_id: Option<String>,
258    /// Tool name (only when role == "tool")
259    #[serde(skip_serializing_if = "Option::is_none")]
260    name: Option<String>,
261    /// Envelope metadata for filtering and curriculum design
262    metadata: TrainingMetadata,
263}
264
265/// Represents a tool call in the assistant's response.
266#[derive(Debug, Clone, Serialize, Deserialize)]
267struct TrainingToolCall {
268    /// Unique id for this tool call
269    id: String,
270    /// Always "function"
271    #[serde(rename = "type")]
272    call_type: String,
273    /// The function being called
274    function: TrainingFunction,
275}
276
277/// Function name + arguments inside a tool call.
278#[derive(Debug, Clone, Serialize, Deserialize)]
279struct TrainingFunction {
280    name: String,
281    arguments: String,
282}
283
284/// Provenance metadata attached to every training record.
285#[derive(Debug, Clone, Serialize, Deserialize)]
286struct TrainingMetadata {
287    /// Original BusMessage variant name (snake_case)
288    bus_kind: String,
289    /// Envelope id
290    envelope_id: String,
291    /// ISO 8601 timestamp
292    timestamp: String,
293    /// Hierarchical topic
294    topic: String,
295    /// Agent that originated the message
296    sender_id: String,
297    /// Correlation id linking request ↔ response
298    #[serde(skip_serializing_if = "Option::is_none")]
299    correlation_id: Option<String>,
300    /// Agent loop step number when available
301    #[serde(skip_serializing_if = "Option::is_none")]
302    step: Option<usize>,
303}
304
305/// Convert a `BusEnvelope` into a `TrainingRecord`.
306fn envelope_step(message: &BusMessage) -> Option<usize> {
307    match message {
308        BusMessage::ToolRequest { step, .. }
309        | BusMessage::ToolResponse { step, .. }
310        | BusMessage::ToolOutputFull { step, .. }
311        | BusMessage::AgentThinking { step, .. }
312        | BusMessage::RalphLearning {
313            iteration: step, ..
314        }
315        | BusMessage::RalphProgress {
316            iteration: step, ..
317        } => Some(*step),
318        BusMessage::AgentReady { .. }
319        | BusMessage::AgentShutdown { .. }
320        | BusMessage::AgentMessage { .. }
321        | BusMessage::TaskUpdate { .. }
322        | BusMessage::ArtifactUpdate { .. }
323        | BusMessage::SharedResult { .. }
324        | BusMessage::Heartbeat { .. }
325        | BusMessage::RalphHandoff { .. }
326        | BusMessage::VoiceSessionStarted { .. }
327        | BusMessage::VoiceTranscript { .. }
328        | BusMessage::VoiceAgentStateChanged { .. }
329        | BusMessage::VoiceSessionEnded { .. } => None,
330    }
331}
332
333fn envelope_to_training_record(env: &BusEnvelope) -> TrainingRecord {
334    let meta = TrainingMetadata {
335        bus_kind: bus_message_kind(&env.message),
336        envelope_id: env.id.clone(),
337        timestamp: env.timestamp.to_rfc3339(),
338        topic: env.topic.clone(),
339        sender_id: env.sender_id.clone(),
340        correlation_id: env.correlation_id.clone(),
341        step: envelope_step(&env.message),
342    };
343
344    match &env.message {
345        // ── Agent lifecycle → system ────────────────────────────────
346        BusMessage::AgentReady {
347            agent_id,
348            capabilities,
349        } => TrainingRecord {
350            role: "system".into(),
351            content: Some(format!(
352                "Agent `{agent_id}` ready. Capabilities: {}",
353                capabilities.join(", ")
354            )),
355            tool_calls: None,
356            tool_call_id: None,
357            name: None,
358            metadata: meta,
359        },
360
361        BusMessage::AgentShutdown { agent_id } => TrainingRecord {
362            role: "system".into(),
363            content: Some(format!("Agent `{agent_id}` shutting down.")),
364            tool_calls: None,
365            tool_call_id: None,
366            name: None,
367            metadata: meta,
368        },
369
370        // ── Agent messages → assistant ──────────────────────────────
371        BusMessage::AgentMessage { from, to, parts } => {
372            let text = parts_to_text(parts);
373            TrainingRecord {
374                role: "assistant".into(),
375                content: Some(format!("[{from} → {to}] {text}")),
376                tool_calls: None,
377                tool_call_id: None,
378                name: None,
379                metadata: meta,
380            }
381        }
382
383        // ── Task/artifact lifecycle → system ────────────────────────
384        BusMessage::TaskUpdate {
385            task_id,
386            state,
387            message,
388        } => {
389            let msg = message.as_deref().unwrap_or("");
390            TrainingRecord {
391                role: "system".into(),
392                content: Some(format!("Task `{task_id}` → {state:?}. {msg}")),
393                tool_calls: None,
394                tool_call_id: None,
395                name: None,
396                metadata: meta,
397            }
398        }
399
400        BusMessage::ArtifactUpdate { task_id, artifact } => {
401            let artifact_text = parts_to_text(&artifact.parts);
402            TrainingRecord {
403                role: "system".into(),
404                content: Some(format!(
405                    "Task `{task_id}` artifact `{}`: {artifact_text}",
406                    artifact.artifact_id
407                )),
408                tool_calls: None,
409                tool_call_id: None,
410                name: None,
411                metadata: meta,
412            }
413        }
414
415        // ── Shared results → system ────────────────────────────────
416        BusMessage::SharedResult { key, value, tags } => TrainingRecord {
417            role: "system".into(),
418            content: Some(format!(
419                "Shared result `{key}` [{}]: {}",
420                tags.join(", "),
421                serde_json::to_string(value).unwrap_or_default()
422            )),
423            tool_calls: None,
424            tool_call_id: None,
425            name: None,
426            metadata: meta,
427        },
428
429        // ── Tool request → assistant with tool_calls ────────────────
430        BusMessage::ToolRequest {
431            request_id,
432            tool_name,
433            arguments,
434            ..
435        } => TrainingRecord {
436            role: "assistant".into(),
437            content: None,
438            tool_calls: Some(vec![TrainingToolCall {
439                id: request_id.clone(),
440                call_type: "function".into(),
441                function: TrainingFunction {
442                    name: tool_name.clone(),
443                    arguments: serde_json::to_string(arguments).unwrap_or_default(),
444                },
445            }]),
446            tool_call_id: None,
447            name: None,
448            metadata: meta,
449        },
450
451        // ── Tool response → tool role ───────────────────────────────
452        BusMessage::ToolResponse {
453            request_id,
454            tool_name,
455            result,
456            success,
457            ..
458        } => TrainingRecord {
459            role: "tool".into(),
460            content: Some(if *success {
461                result.clone()
462            } else {
463                format!("[ERROR] {result}")
464            }),
465            tool_calls: None,
466            tool_call_id: Some(request_id.clone()),
467            name: Some(tool_name.clone()),
468            metadata: meta,
469        },
470
471        // ── Full tool output → tool role (untruncated) ──────────────
472        BusMessage::ToolOutputFull {
473            agent_id,
474            tool_name,
475            output,
476            success,
477            step,
478        } => TrainingRecord {
479            role: "tool".into(),
480            content: Some(if *success {
481                format!("[step {step}, agent {agent_id}] {output}")
482            } else {
483                format!("[step {step}, agent {agent_id}, ERROR] {output}")
484            }),
485            tool_calls: None,
486            tool_call_id: None,
487            name: Some(tool_name.clone()),
488            metadata: meta,
489        },
490
491        // ── Heartbeat → system (filtered out during flush) ──────────
492        BusMessage::Heartbeat { .. } => TrainingRecord {
493            role: "system".into(),
494            content: None,
495            tool_calls: None,
496            tool_call_id: None,
497            name: None,
498            metadata: meta,
499        },
500
501        // ── Ralph learnings → system context ────────────────────────
502        BusMessage::RalphLearning {
503            prd_id,
504            story_id,
505            iteration,
506            learnings,
507            context,
508        } => TrainingRecord {
509            role: "system".into(),
510            content: Some(format!(
511                "Ralph learning (PRD {prd_id}, story {story_id}, iter {iteration}):\n{}\nContext: {}",
512                learnings
513                    .iter()
514                    .map(|l| format!("- {l}"))
515                    .collect::<Vec<_>>()
516                    .join("\n"),
517                serde_json::to_string(context).unwrap_or_default()
518            )),
519            tool_calls: None,
520            tool_call_id: None,
521            name: None,
522            metadata: meta,
523        },
524
525        BusMessage::RalphHandoff {
526            prd_id,
527            from_story,
528            to_story,
529            context,
530            progress_summary,
531        } => TrainingRecord {
532            role: "system".into(),
533            content: Some(format!(
534                "Ralph handoff (PRD {prd_id}): {from_story} → {to_story}\nSummary: {progress_summary}\nContext: {}",
535                serde_json::to_string(context).unwrap_or_default()
536            )),
537            tool_calls: None,
538            tool_call_id: None,
539            name: None,
540            metadata: meta,
541        },
542
543        BusMessage::RalphProgress {
544            prd_id,
545            passed,
546            total,
547            iteration,
548            status,
549        } => TrainingRecord {
550            role: "system".into(),
551            content: Some(format!(
552                "Ralph progress (PRD {prd_id}): {passed}/{total} stories passed, iter {iteration}, status: {status}"
553            )),
554            tool_calls: None,
555            tool_call_id: None,
556            name: None,
557            metadata: meta,
558        },
559
560        // ── Agent thinking → assistant reasoning ────────────────────
561        BusMessage::AgentThinking {
562            agent_id,
563            thinking,
564            step,
565        } => TrainingRecord {
566            role: "assistant".into(),
567            content: Some(format!("<thinking>\n{thinking}\n</thinking>")),
568            tool_calls: None,
569            tool_call_id: None,
570            name: Some(format!("reasoning.{agent_id}.step_{step}")),
571            metadata: meta,
572        },
573
574        // ── Voice session lifecycle → system ────────────────────────
575        BusMessage::VoiceSessionStarted {
576            room_name,
577            agent_id,
578            voice_id,
579        } => TrainingRecord {
580            role: "system".into(),
581            content: Some(format!(
582                "Voice session started: room={room_name}, agent={agent_id}, voice={voice_id}"
583            )),
584            tool_calls: None,
585            tool_call_id: None,
586            name: None,
587            metadata: meta,
588        },
589
590        BusMessage::VoiceTranscript {
591            room_name,
592            text,
593            role,
594            is_final,
595        } => TrainingRecord {
596            role: if role == "user" {
597                "user".into()
598            } else {
599                "assistant".into()
600            },
601            content: Some(format!(
602                "[voice:{room_name}{}] {text}",
603                if *is_final { " final" } else { "" }
604            )),
605            tool_calls: None,
606            tool_call_id: None,
607            name: None,
608            metadata: meta,
609        },
610
611        BusMessage::VoiceAgentStateChanged { room_name, state } => TrainingRecord {
612            role: "system".into(),
613            content: Some(format!("Voice agent state: room={room_name} → {state}")),
614            tool_calls: None,
615            tool_call_id: None,
616            name: None,
617            metadata: meta,
618        },
619
620        BusMessage::VoiceSessionEnded { room_name, reason } => TrainingRecord {
621            role: "system".into(),
622            content: Some(format!(
623                "Voice session ended: room={room_name}, reason={reason}"
624            )),
625            tool_calls: None,
626            tool_call_id: None,
627            name: None,
628            metadata: meta,
629        },
630    }
631}
632
633type ToolGroupKey = (String, usize, Option<String>);
634
635fn collect_training_records(envelopes: &[BusEnvelope]) -> Vec<TrainingRecord> {
636    let mut grouped_records: BTreeMap<ToolGroupKey, Vec<TrainingRecord>> = BTreeMap::new();
637    let mut passthrough_records = Vec::new();
638
639    for env in envelopes {
640        if matches!(env.message, BusMessage::Heartbeat { .. }) {
641            continue;
642        }
643        let record = envelope_to_training_record(env);
644        if let Some(step) = record.metadata.step
645            && matches!(
646                env.message,
647                BusMessage::ToolRequest { .. } | BusMessage::ToolResponse { .. }
648            )
649        {
650            grouped_records
651                .entry((
652                    record.metadata.sender_id.clone(),
653                    step,
654                    record.metadata.correlation_id.clone(),
655                ))
656                .or_default()
657                .push(record);
658            continue;
659        }
660        passthrough_records.push(record);
661    }
662
663    let mut records = passthrough_records;
664    for (_key, mut group) in grouped_records {
665        append_training_group(&mut group, &mut records);
666    }
667    records
668}
669
670fn append_training_group(records: &mut [TrainingRecord], output: &mut Vec<TrainingRecord>) {
671    if records.is_empty() {
672        return;
673    }
674
675    let assistant_prefix_len = records
676        .iter()
677        .take_while(|record| {
678            record.role == "assistant"
679                && record
680                    .tool_calls
681                    .as_ref()
682                    .is_some_and(|calls| calls.len() == 1)
683        })
684        .count();
685    let tool_suffix_len = records[assistant_prefix_len..]
686        .iter()
687        .take_while(|record| record.role == "tool" && record.tool_call_id.is_some())
688        .count();
689    let can_merge = assistant_prefix_len > 0
690        && tool_suffix_len > 0
691        && assistant_prefix_len + tool_suffix_len == records.len();
692
693    if can_merge {
694        let mut merged = records[0].clone();
695        let mut tool_calls = Vec::with_capacity(assistant_prefix_len);
696        let mut envelope_ids = Vec::with_capacity(assistant_prefix_len);
697        let mut contents = Vec::new();
698
699        for record in records.iter().take(assistant_prefix_len) {
700            envelope_ids.push(record.metadata.envelope_id.clone());
701            if let Some(content) = record
702                .content
703                .as_ref()
704                .filter(|content| !content.is_empty())
705            {
706                contents.push(content.clone());
707            }
708            if let Some(mut calls) = record.tool_calls.clone() {
709                tool_calls.append(&mut calls);
710            }
711        }
712
713        merged.tool_calls = Some(tool_calls);
714        merged.content = if contents.is_empty() {
715            None
716        } else {
717            Some(contents.join("\n"))
718        };
719        merged.metadata.bus_kind = "tool_request_batch".into();
720        merged.metadata.envelope_id = envelope_ids.join(",");
721
722        output.push(merged);
723        output.extend(records.iter().skip(assistant_prefix_len).cloned());
724        return;
725    }
726
727    output.extend(records.iter().cloned());
728}
729
730fn serialize_training_records(records: &[TrainingRecord]) -> Vec<String> {
731    records
732        .iter()
733        .filter_map(|record| serde_json::to_string(record).ok())
734        .collect()
735}
736
737fn build_s3_key(prefix: &str, now: chrono::DateTime<Utc>) -> String {
738    let prefix = if prefix.is_empty() {
739        String::new()
740    } else if prefix.ends_with('/') {
741        prefix.to_string()
742    } else {
743        format!("{prefix}/")
744    };
745    let date_path = now.format("%Y/%m/%d/%H").to_string();
746    let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
747    let uuid = uuid::Uuid::new_v4();
748    format!("{prefix}v2/{date_path}/batch_{timestamp}_{uuid}.jsonl")
749}
750
751/// Extract the serde tag name from a `BusMessage` variant.
752fn bus_message_kind(msg: &BusMessage) -> String {
753    serde_json::to_value(msg)
754        .ok()
755        .and_then(|v| v.get("kind").and_then(|k| k.as_str()).map(String::from))
756        .unwrap_or_else(|| "unknown".into())
757}
758
759/// Concatenate `Part` items into a single text string.
760fn parts_to_text(parts: &[Part]) -> String {
761    parts
762        .iter()
763        .map(|p| match p {
764            Part::Text { text } => text.as_str(),
765            Part::Data { .. } => "<<data>>",
766            Part::File { .. } => "<<file>>",
767        })
768        .collect::<Vec<_>>()
769        .join("\n")
770}
771
772// ─── S3 Sink ─────────────────────────────────────────────────────────────
773
774/// S3 sink that archives all bus messages as JSONL training records
775pub struct BusS3Sink {
776    #[allow(dead_code)]
777    bus: Arc<AgentBus>,
778    client: MinioClient,
779    config: BusS3SinkConfig,
780    rx: broadcast::Receiver<BusEnvelope>,
781}
782
783impl BusS3Sink {
784    #[allow(dead_code)]
785    /// Create a new bus S3 sink
786    pub async fn new(
787        bus: Arc<AgentBus>,
788        endpoint: &str,
789        access_key: &str,
790        secret_key: &str,
791        bucket: &str,
792        prefix: &str,
793    ) -> Result<Self> {
794        let config = BusS3SinkConfig {
795            endpoint: endpoint.to_string(),
796            access_key: access_key.to_string(),
797            secret_key: secret_key.to_string(),
798            bucket: bucket.to_string(),
799            prefix: prefix.to_string(),
800            batch_size: 100,
801            flush_interval_secs: 30,
802            secure: endpoint.starts_with("https"),
803            ignore_cert: false,
804        };
805
806        Self::from_config(bus, config).await
807    }
808
809    /// Create sink from configuration
810    pub async fn from_config(bus: Arc<AgentBus>, config: BusS3SinkConfig) -> Result<Self> {
811        let endpoint = normalize_endpoint(&config.endpoint, config.secure);
812
813        let base_url: BaseUrl = endpoint.parse().context("Invalid MinIO endpoint URL")?;
814
815        let static_provider = StaticProvider::new(&config.access_key, &config.secret_key, None);
816
817        let client = MinioClientBuilder::new(base_url)
818            .provider(Some(Box::new(static_provider)))
819            .ignore_cert_check(Some(config.ignore_cert))
820            .build()?;
821
822        let rx = bus.tx.subscribe();
823
824        Ok(Self {
825            bus,
826            client,
827            config,
828            rx,
829        })
830    }
831
832    #[allow(dead_code)]
833    /// Create sink from environment variables
834    pub async fn from_env(bus: Arc<AgentBus>) -> Result<Self> {
835        let config = BusS3SinkConfig::from_env()?;
836        Self::from_config(bus, config).await
837    }
838
839    /// Ensure the bucket exists, creating it if necessary
840    pub async fn ensure_bucket(&self) -> Result<()> {
841        match self.client.bucket_exists(&self.config.bucket).send().await {
842            Ok(resp) if resp.exists => {
843                debug!(bucket = %self.config.bucket, "S3 bucket exists");
844            }
845            Ok(_) => {
846                info!(bucket = %self.config.bucket, "Creating S3 bucket");
847                match self.client.create_bucket(&self.config.bucket).send().await {
848                    Ok(_) => {}
849                    Err(e) => {
850                        let err_text = e.to_string();
851                        if !err_text.contains("BucketAlreadyOwnedByYou")
852                            && !err_text.contains("BucketAlreadyExists")
853                        {
854                            return Err(anyhow::anyhow!("Failed to create bucket: {err_text}"));
855                        }
856                        debug!(bucket = %self.config.bucket, "Bucket already exists");
857                    }
858                }
859            }
860            Err(e) => {
861                debug!(error = %e, bucket = %self.config.bucket, "Bucket check returned error (may already exist)");
862            }
863        }
864        Ok(())
865    }
866
867    /// Run the sink loop - subscribes to bus and uploads batches to S3
868    pub async fn run(mut self) -> Result<()> {
869        self.ensure_bucket().await?;
870
871        info!(
872            bucket = %self.config.bucket,
873            prefix = %self.config.prefix,
874            batch_size = self.config.batch_size,
875            flush_secs = self.config.flush_interval_secs,
876            "Bus S3 sink started (JSONL training record format)"
877        );
878
879        let mut batch: Vec<BusEnvelope> = Vec::with_capacity(self.config.batch_size);
880        let mut batch_start: Option<String> = None;
881        let mut flush_interval = tokio::time::interval(std::time::Duration::from_secs(
882            self.config.flush_interval_secs,
883        ));
884
885        loop {
886            tokio::select! {
887                result = self.rx.recv() => {
888                    match result {
889                        Ok(envelope) => {
890                            if batch_start.is_none() {
891                                batch_start = Some(envelope.timestamp.to_rfc3339());
892                            }
893                            batch.push(envelope);
894
895                            if batch.len() >= self.config.batch_size {
896                                if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
897                                    error!(error = %e, "Failed to flush batch");
898                                }
899                                // Yield to allow LLM requests priority access to executor
900                                task::yield_now().await;
901                            }
902                        }
903                        Err(broadcast::error::RecvError::Lagged(n)) => {
904                            warn!(skipped = n, "Bus S3 sink lagged, some messages dropped");
905                        }
906                        Err(broadcast::error::RecvError::Closed) => {
907                            info!("Bus channel closed, shutting down S3 sink");
908                            if !batch.is_empty()
909                                && let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
910                                    error!(error = %e, "Failed to flush final batch");
911                                }
912                            return Ok(());
913                        }
914                    }
915                }
916
917                _ = flush_interval.tick() => {
918                    if !batch.is_empty() {
919                        if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
920                            error!(error = %e, "Failed to flush batch on interval");
921                        }
922                        // Yield to allow LLM requests priority access to executor
923                        task::yield_now().await;
924                    }
925                }
926            }
927        }
928    }
929
930    /// Flush the current batch to S3 as JSONL (one training record per line).
931    ///
932    /// Heartbeats are filtered out as noise. Every other envelope is
933    /// transformed into a `TrainingRecord` and serialized as a single JSON
934    /// line, making the file directly ingestible by LLM fine-tuning tools.
935    async fn flush_batch(
936        &self,
937        batch: &mut Vec<BusEnvelope>,
938        batch_start: &mut Option<String>,
939    ) -> Result<()> {
940        if batch.is_empty() {
941            return Ok(());
942        }
943
944        let _start_time = batch_start
945            .take()
946            .unwrap_or_else(|| Utc::now().to_rfc3339());
947        let envelopes = std::mem::take(batch);
948
949        // Build JSONL: buffer tool requests/responses by agent step and
950        // correlation id so separate turns do not merge when a sender reuses
951        // the same step number. Legacy producers with `None` correlation ids
952        // still group together compatibly.
953        let records = collect_training_records(&envelopes);
954        let lines = serialize_training_records(&records);
955        if lines.is_empty() {
956            return Ok(());
957        }
958
959        let count = lines.len();
960        let jsonl = lines.join("\n");
961
962        // S3 key: prefix/v2/YYYY/MM/DD/HH/batch_YYYYMMDDTHHMMSS_uuid.jsonl
963        let now = Utc::now();
964        let s3_key = build_s3_key(&self.config.prefix, now);
965
966        let content = ObjectContent::from(jsonl.into_bytes());
967
968        match self
969            .client
970            .put_object_content(&self.config.bucket, &s3_key, content)
971            .send()
972            .await
973        {
974            Ok(_) => {
975                info!(
976                    bucket = %self.config.bucket,
977                    key = %s3_key,
978                    records = count,
979                    "Uploaded training records to S3"
980                );
981            }
982            Err(e) => {
983                error!(
984                    bucket = %self.config.bucket,
985                    key = %s3_key,
986                    error = %e,
987                    "Failed to upload training records to S3"
988                );
989                return Err(anyhow::anyhow!("S3 upload failed: {e}"));
990            }
991        }
992
993        Ok(())
994    }
995
996    #[allow(dead_code)]
997    /// Get the bucket name
998    pub fn bucket(&self) -> &str {
999        &self.config.bucket
1000    }
1001
1002    #[allow(dead_code)]
1003    /// Get the prefix
1004    pub fn prefix(&self) -> &str {
1005        &self.config.prefix
1006    }
1007}
1008
1009/// Normalize endpoint URL (ensure protocol, remove trailing slash)
1010fn normalize_endpoint(endpoint: &str, secure: bool) -> String {
1011    let endpoint = endpoint.trim_end_matches('/');
1012
1013    if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
1014        endpoint.to_string()
1015    } else if secure {
1016        format!("https://{endpoint}")
1017    } else {
1018        format!("http://{endpoint}")
1019    }
1020}
1021
1022/// Spawn the bus S3 sink in a background task.
1023///
1024/// The S3 sink runs non-blocking in its own task, processing batches
1025/// of training records. It yields periodically to ensure LLM requests
1026/// have priority for CPU/network resources.
1027///
1028/// Errors are logged but do not crash the application.
1029pub fn spawn_bus_s3_sink(bus: Arc<AgentBus>) -> tokio::task::JoinHandle<()> {
1030    tokio::spawn(async move {
1031        match BusS3SinkConfig::from_env_or_vault().await {
1032            Ok(config) => match BusS3Sink::from_config(bus, config).await {
1033                Ok(sink) => {
1034                    if let Err(e) = sink.run().await {
1035                        error!(error = %e, "Bus S3 sink failed");
1036                    }
1037                }
1038                Err(e) => {
1039                    error!(error = %e, "Bus S3 sink failed to initialize");
1040                }
1041            },
1042            Err(e) => {
1043                warn!(
1044                    error = %e,
1045                    "Bus S3 sink not configured - set MINIO_*/CODETETHER_CHAT_SYNC_MINIO_* env vars or configure chat-sync-minio in Vault"
1046                );
1047            }
1048        }
1049    })
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054    use super::*;
1055    use serde_json::Value;
1056
1057    #[test]
1058    fn test_normalize_endpoint() {
1059        assert_eq!(
1060            normalize_endpoint("localhost:9000", false),
1061            "http://localhost:9000"
1062        );
1063        assert_eq!(
1064            normalize_endpoint("localhost:9000", true),
1065            "https://localhost:9000"
1066        );
1067        assert_eq!(
1068            normalize_endpoint("http://localhost:9000/", false),
1069            "http://localhost:9000"
1070        );
1071        assert_eq!(
1072            normalize_endpoint("https://minio.example.com/", true),
1073            "https://minio.example.com"
1074        );
1075    }
1076
1077    #[test]
1078    fn test_config_defaults() {
1079        let config = BusS3SinkConfig {
1080            endpoint: "http://localhost:9000".to_string(),
1081            access_key: "key".to_string(),
1082            secret_key: "secret".to_string(),
1083            bucket: "test".to_string(),
1084            prefix: default_prefix(),
1085            batch_size: default_batch_size(),
1086            flush_interval_secs: default_flush_interval_secs(),
1087            secure: false,
1088            ignore_cert: false,
1089        };
1090
1091        assert_eq!(config.prefix, "training/");
1092        assert_eq!(config.batch_size, 100);
1093        assert_eq!(config.flush_interval_secs, 30);
1094    }
1095
1096    #[test]
1097    fn test_training_record_tool_request() {
1098        let env = BusEnvelope {
1099            id: "env-1".into(),
1100            topic: "tools.read_file".into(),
1101            sender_id: "agent-0".into(),
1102            correlation_id: Some("corr-1".into()),
1103            timestamp: Utc::now(),
1104            message: BusMessage::ToolRequest {
1105                request_id: "req-1".into(),
1106                agent_id: "agent-0".into(),
1107                tool_name: "read_file".into(),
1108                arguments: serde_json::json!({"path": "/src/main.rs"}),
1109                step: 1,
1110            },
1111        };
1112
1113        let record = envelope_to_training_record(&env);
1114        assert_eq!(record.role, "assistant");
1115        assert!(record.content.is_none());
1116        let calls = record.tool_calls.unwrap();
1117        assert_eq!(calls.len(), 1);
1118        assert_eq!(calls[0].function.name, "read_file");
1119        assert_eq!(calls[0].call_type, "function");
1120        assert_eq!(record.metadata.bus_kind, "tool_request");
1121    }
1122
1123    #[test]
1124    fn test_training_record_tool_response() {
1125        let env = BusEnvelope {
1126            id: "env-2".into(),
1127            topic: "tools.read_file".into(),
1128            sender_id: "agent-0".into(),
1129            correlation_id: Some("corr-1".into()),
1130            timestamp: Utc::now(),
1131            message: BusMessage::ToolResponse {
1132                request_id: "req-1".into(),
1133                agent_id: "agent-0".into(),
1134                tool_name: "read_file".into(),
1135                result: "fn main() {}".into(),
1136                success: true,
1137                step: 1,
1138            },
1139        };
1140
1141        let record = envelope_to_training_record(&env);
1142        assert_eq!(record.role, "tool");
1143        assert_eq!(record.content.as_deref(), Some("fn main() {}"));
1144        assert_eq!(record.tool_call_id.as_deref(), Some("req-1"));
1145        assert_eq!(record.name.as_deref(), Some("read_file"));
1146        assert_eq!(record.metadata.bus_kind, "tool_response");
1147    }
1148
1149    #[test]
1150    fn test_training_record_agent_message() {
1151        let env = BusEnvelope {
1152            id: "env-3".into(),
1153            topic: "agent.planner".into(),
1154            sender_id: "coder".into(),
1155            correlation_id: None,
1156            timestamp: Utc::now(),
1157            message: BusMessage::AgentMessage {
1158                from: "coder".into(),
1159                to: "planner".into(),
1160                parts: vec![Part::Text {
1161                    text: "I fixed the bug".into(),
1162                }],
1163            },
1164        };
1165
1166        let record = envelope_to_training_record(&env);
1167        assert_eq!(record.role, "assistant");
1168        assert!(
1169            record
1170                .content
1171                .as_deref()
1172                .unwrap()
1173                .contains("I fixed the bug")
1174        );
1175        assert!(
1176            record
1177                .content
1178                .as_deref()
1179                .unwrap()
1180                .contains("[coder → planner]")
1181        );
1182    }
1183
1184    #[test]
1185    fn test_heartbeat_skipped_role() {
1186        let env = BusEnvelope {
1187            id: "env-4".into(),
1188            topic: "broadcast".into(),
1189            sender_id: "agent-0".into(),
1190            correlation_id: None,
1191            timestamp: Utc::now(),
1192            message: BusMessage::Heartbeat {
1193                agent_id: "agent-0".into(),
1194                status: "ok".into(),
1195            },
1196        };
1197
1198        let record = envelope_to_training_record(&env);
1199        // Heartbeats produce a record but flush_batch filters them out
1200        assert_eq!(record.role, "system");
1201        assert!(record.content.is_none());
1202    }
1203
1204    #[test]
1205    fn test_tool_grouping_keeps_different_correlation_ids_separate() {
1206        let envelopes = vec![
1207            tool_request_envelope("env-1", "req-1", Some("turn-1")),
1208            tool_response_envelope("env-2", "req-1", Some("turn-1")),
1209            tool_request_envelope("env-3", "req-2", Some("turn-2")),
1210            tool_response_envelope("env-4", "req-2", Some("turn-2")),
1211        ];
1212
1213        let records = collect_training_records(&envelopes);
1214        let assistant_batches: Vec<_> = records
1215            .iter()
1216            .filter(|record| record.metadata.bus_kind == "tool_request_batch")
1217            .collect();
1218
1219        assert_eq!(records.len(), 4);
1220        assert_eq!(assistant_batches.len(), 2);
1221        assert_eq!(
1222            assistant_batches[0].metadata.correlation_id.as_deref(),
1223            Some("turn-1")
1224        );
1225        assert_eq!(
1226            assistant_batches[1].metadata.correlation_id.as_deref(),
1227            Some("turn-2")
1228        );
1229        assert_eq!(assistant_batches[0].tool_calls.as_ref().unwrap().len(), 1);
1230        assert_eq!(assistant_batches[1].tool_calls.as_ref().unwrap().len(), 1);
1231    }
1232
1233    #[test]
1234    fn test_tool_grouping_keeps_legacy_none_correlation_compatible() {
1235        let envelopes = vec![
1236            tool_request_envelope("env-1", "req-1", None),
1237            tool_request_envelope("env-2", "req-2", None),
1238            tool_response_envelope("env-3", "req-1", None),
1239            tool_response_envelope("env-4", "req-2", None),
1240        ];
1241
1242        let records = collect_training_records(&envelopes);
1243        let assistant_batches: Vec<_> = records
1244            .iter()
1245            .filter(|record| record.metadata.bus_kind == "tool_request_batch")
1246            .collect();
1247
1248        assert_eq!(records.len(), 3);
1249        assert_eq!(assistant_batches.len(), 1);
1250        assert!(assistant_batches[0].metadata.correlation_id.is_none());
1251        assert_eq!(assistant_batches[0].tool_calls.as_ref().unwrap().len(), 2);
1252    }
1253
1254    fn tool_request_envelope(
1255        envelope_id: &str,
1256        request_id: &str,
1257        correlation_id: Option<&str>,
1258    ) -> BusEnvelope {
1259        BusEnvelope {
1260            id: envelope_id.into(),
1261            topic: "tools.read_file".into(),
1262            sender_id: "agent-0".into(),
1263            correlation_id: correlation_id.map(str::to_string),
1264            timestamp: Utc::now(),
1265            message: BusMessage::ToolRequest {
1266                request_id: request_id.into(),
1267                agent_id: "agent-0".into(),
1268                tool_name: "read_file".into(),
1269                arguments: Value::Null,
1270                step: 1,
1271            },
1272        }
1273    }
1274
1275    fn tool_response_envelope(
1276        envelope_id: &str,
1277        request_id: &str,
1278        correlation_id: Option<&str>,
1279    ) -> BusEnvelope {
1280        BusEnvelope {
1281            id: envelope_id.into(),
1282            topic: "tools.read_file".into(),
1283            sender_id: "agent-0".into(),
1284            correlation_id: correlation_id.map(str::to_string),
1285            timestamp: Utc::now(),
1286            message: BusMessage::ToolResponse {
1287                request_id: request_id.into(),
1288                agent_id: "agent-0".into(),
1289                tool_name: "read_file".into(),
1290                result: "ok".into(),
1291                success: true,
1292                step: 1,
1293            },
1294        }
1295    }
1296}