1use super::{AgentBus, BusEnvelope, BusMessage};
34use crate::a2a::types::Part;
35use crate::secrets;
36use anyhow::{Context, Result};
37use chrono::Utc;
38use minio::s3::builders::ObjectContent;
39use minio::s3::creds::StaticProvider;
40use minio::s3::http::BaseUrl;
41use minio::s3::types::S3Api;
42use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
43use serde::{Deserialize, Serialize};
44use std::collections::BTreeMap;
45use std::sync::Arc;
46use tokio::sync::broadcast;
47use tokio::task;
48use tracing::{debug, error, info, warn};
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BusS3SinkConfig {
53 pub endpoint: String,
55 pub access_key: String,
57 pub secret_key: String,
59 pub bucket: String,
61 #[serde(default = "default_prefix")]
63 pub prefix: String,
64 #[serde(default = "default_batch_size")]
66 pub batch_size: usize,
67 #[serde(default = "default_flush_interval_secs")]
69 pub flush_interval_secs: u64,
70 #[serde(default)]
72 pub secure: bool,
73 #[serde(default)]
75 pub ignore_cert: bool,
76}
77
78fn default_prefix() -> String {
79 "training/".to_string()
80}
81
82fn default_batch_size() -> usize {
83 100
84}
85
86fn default_flush_interval_secs() -> u64 {
87 30
88}
89
90impl BusS3SinkConfig {
91 pub fn from_env() -> Result<Self> {
102 let endpoint = std::env::var("MINIO_ENDPOINT")
103 .or_else(|_| std::env::var("CODETETHER_BUS_S3_ENDPOINT"))
104 .context("MINIO_ENDPOINT or CODETETHER_BUS_S3_ENDPOINT required for bus S3 sink")?;
105
106 let access_key = std::env::var("MINIO_ACCESS_KEY")
107 .or_else(|_| std::env::var("CODETETHER_BUS_S3_ACCESS_KEY"))
108 .context("MINIO_ACCESS_KEY or CODETETHER_BUS_S3_ACCESS_KEY required")?;
109
110 let secret_key = std::env::var("MINIO_SECRET_KEY")
111 .or_else(|_| std::env::var("CODETETHER_BUS_S3_SECRET_KEY"))
112 .context("MINIO_SECRET_KEY or CODETETHER_BUS_S3_SECRET_KEY required")?;
113
114 Ok(Self {
115 endpoint,
116 access_key,
117 secret_key,
118 bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
119 .unwrap_or_else(|_| "codetether-training".to_string()),
120 prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
121 .unwrap_or_else(|_| "training/".to_string()),
122 batch_size: std::env::var("CODETETHER_BUS_S3_BATCH_SIZE")
123 .ok()
124 .and_then(|s| s.parse().ok())
125 .unwrap_or(100),
126 flush_interval_secs: std::env::var("CODETETHER_BUS_S3_FLUSH_SECS")
127 .ok()
128 .and_then(|s| s.parse().ok())
129 .unwrap_or(30),
130 secure: std::env::var("MINIO_SECURE")
131 .ok()
132 .map(|s| s.to_lowercase() == "true")
133 .unwrap_or(false),
134 ignore_cert: std::env::var("MINIO_IGNORE_CERT")
135 .ok()
136 .map(|s| s.to_lowercase() == "true")
137 .unwrap_or(false),
138 })
139 }
140
141 pub async fn from_env_or_vault() -> Result<Self> {
147 if let Ok(cfg) = Self::from_env() {
149 return Ok(cfg);
150 }
151
152 let endpoint = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ENDPOINT");
154 let access_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ACCESS_KEY");
155 let secret_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_SECRET_KEY");
156
157 if let (Some(ep), Some(ak), Some(sk)) =
158 (endpoint.clone(), access_key.clone(), secret_key.clone())
159 {
160 info!("Bus S3 sink using chat-sync env vars");
161 return Ok(Self {
162 endpoint: ep,
163 access_key: ak,
164 secret_key: sk,
165 bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
166 .unwrap_or_else(|_| "codetether-training".to_string()),
167 prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
168 .unwrap_or_else(|_| "training/".to_string()),
169 batch_size: 100,
170 flush_interval_secs: 30,
171 secure: false,
172 ignore_cert: false,
173 });
174 }
175
176 if let Some(secrets) = secrets::get_provider_secrets("chat-sync-minio").await {
178 let ep = secrets
179 .base_url
180 .clone()
181 .or_else(|| vault_extra_str(&secrets, &["endpoint", "minio_endpoint", "url"]))
182 .filter(|s| !s.is_empty());
183 let ak = vault_extra_str(
184 &secrets,
185 &["access_key", "access_key_id", "minio_access_key"],
186 )
187 .or_else(|| secrets.api_key.clone())
188 .filter(|s| !s.is_empty());
189 let sk = vault_extra_str(
190 &secrets,
191 &["secret_key", "secret_access_key", "minio_secret_key"],
192 )
193 .filter(|s| !s.is_empty());
194
195 if let (Some(ep), Some(ak), Some(sk)) = (ep, ak, sk) {
196 info!("Bus S3 sink using Vault chat-sync-minio credentials");
197 return Ok(Self {
198 endpoint: ep,
199 access_key: ak,
200 secret_key: sk,
201 bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
202 .unwrap_or_else(|_| "codetether-training".to_string()),
203 prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
204 .unwrap_or_else(|_| "training/".to_string()),
205 batch_size: 100,
206 flush_interval_secs: 30,
207 secure: false,
208 ignore_cert: false,
209 });
210 }
211 }
212
213 anyhow::bail!(
214 "No MinIO credentials found. Set MINIO_ENDPOINT/MINIO_ACCESS_KEY/MINIO_SECRET_KEY, \
215 CODETETHER_CHAT_SYNC_MINIO_* env vars, or configure chat-sync-minio in Vault."
216 )
217 }
218}
219
220fn env_non_empty(key: &str) -> Option<String> {
222 std::env::var(key).ok().filter(|s| !s.is_empty())
223}
224
225fn vault_extra_str(secrets: &secrets::ProviderSecrets, keys: &[&str]) -> Option<String> {
227 for key in keys {
228 if let Some(val) = secrets.extra.get(*key)
229 && let Some(s) = val.as_str()
230 && !s.is_empty()
231 {
232 return Some(s.to_string());
233 }
234 }
235 None
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
246struct TrainingRecord {
247 role: String,
249 #[serde(skip_serializing_if = "Option::is_none")]
251 content: Option<String>,
252 #[serde(skip_serializing_if = "Option::is_none")]
254 tool_calls: Option<Vec<TrainingToolCall>>,
255 #[serde(skip_serializing_if = "Option::is_none")]
257 tool_call_id: Option<String>,
258 #[serde(skip_serializing_if = "Option::is_none")]
260 name: Option<String>,
261 metadata: TrainingMetadata,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267struct TrainingToolCall {
268 id: String,
270 #[serde(rename = "type")]
272 call_type: String,
273 function: TrainingFunction,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279struct TrainingFunction {
280 name: String,
281 arguments: String,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
286struct TrainingMetadata {
287 bus_kind: String,
289 envelope_id: String,
291 timestamp: String,
293 topic: String,
295 sender_id: String,
297 #[serde(skip_serializing_if = "Option::is_none")]
299 correlation_id: Option<String>,
300 #[serde(skip_serializing_if = "Option::is_none")]
302 step: Option<usize>,
303}
304
305fn envelope_step(message: &BusMessage) -> Option<usize> {
307 match message {
308 BusMessage::ToolRequest { step, .. }
309 | BusMessage::ToolResponse { step, .. }
310 | BusMessage::ToolOutputFull { step, .. }
311 | BusMessage::AgentThinking { step, .. }
312 | BusMessage::RalphLearning {
313 iteration: step, ..
314 }
315 | BusMessage::RalphProgress {
316 iteration: step, ..
317 } => Some(*step),
318 BusMessage::AgentReady { .. }
319 | BusMessage::AgentShutdown { .. }
320 | BusMessage::AgentMessage { .. }
321 | BusMessage::TaskUpdate { .. }
322 | BusMessage::ArtifactUpdate { .. }
323 | BusMessage::SharedResult { .. }
324 | BusMessage::Heartbeat { .. }
325 | BusMessage::RalphHandoff { .. }
326 | BusMessage::VoiceSessionStarted { .. }
327 | BusMessage::VoiceTranscript { .. }
328 | BusMessage::VoiceAgentStateChanged { .. }
329 | BusMessage::VoiceSessionEnded { .. } => None,
330 }
331}
332
333fn envelope_to_training_record(env: &BusEnvelope) -> TrainingRecord {
334 let meta = TrainingMetadata {
335 bus_kind: bus_message_kind(&env.message),
336 envelope_id: env.id.clone(),
337 timestamp: env.timestamp.to_rfc3339(),
338 topic: env.topic.clone(),
339 sender_id: env.sender_id.clone(),
340 correlation_id: env.correlation_id.clone(),
341 step: envelope_step(&env.message),
342 };
343
344 match &env.message {
345 BusMessage::AgentReady {
347 agent_id,
348 capabilities,
349 } => TrainingRecord {
350 role: "system".into(),
351 content: Some(format!(
352 "Agent `{agent_id}` ready. Capabilities: {}",
353 capabilities.join(", ")
354 )),
355 tool_calls: None,
356 tool_call_id: None,
357 name: None,
358 metadata: meta,
359 },
360
361 BusMessage::AgentShutdown { agent_id } => TrainingRecord {
362 role: "system".into(),
363 content: Some(format!("Agent `{agent_id}` shutting down.")),
364 tool_calls: None,
365 tool_call_id: None,
366 name: None,
367 metadata: meta,
368 },
369
370 BusMessage::AgentMessage { from, to, parts } => {
372 let text = parts_to_text(parts);
373 TrainingRecord {
374 role: "assistant".into(),
375 content: Some(format!("[{from} → {to}] {text}")),
376 tool_calls: None,
377 tool_call_id: None,
378 name: None,
379 metadata: meta,
380 }
381 }
382
383 BusMessage::TaskUpdate {
385 task_id,
386 state,
387 message,
388 } => {
389 let msg = message.as_deref().unwrap_or("");
390 TrainingRecord {
391 role: "system".into(),
392 content: Some(format!("Task `{task_id}` → {state:?}. {msg}")),
393 tool_calls: None,
394 tool_call_id: None,
395 name: None,
396 metadata: meta,
397 }
398 }
399
400 BusMessage::ArtifactUpdate { task_id, artifact } => {
401 let artifact_text = parts_to_text(&artifact.parts);
402 TrainingRecord {
403 role: "system".into(),
404 content: Some(format!(
405 "Task `{task_id}` artifact `{}`: {artifact_text}",
406 artifact.artifact_id
407 )),
408 tool_calls: None,
409 tool_call_id: None,
410 name: None,
411 metadata: meta,
412 }
413 }
414
415 BusMessage::SharedResult { key, value, tags } => TrainingRecord {
417 role: "system".into(),
418 content: Some(format!(
419 "Shared result `{key}` [{}]: {}",
420 tags.join(", "),
421 serde_json::to_string(value).unwrap_or_default()
422 )),
423 tool_calls: None,
424 tool_call_id: None,
425 name: None,
426 metadata: meta,
427 },
428
429 BusMessage::ToolRequest {
431 request_id,
432 tool_name,
433 arguments,
434 ..
435 } => TrainingRecord {
436 role: "assistant".into(),
437 content: None,
438 tool_calls: Some(vec![TrainingToolCall {
439 id: request_id.clone(),
440 call_type: "function".into(),
441 function: TrainingFunction {
442 name: tool_name.clone(),
443 arguments: serde_json::to_string(arguments).unwrap_or_default(),
444 },
445 }]),
446 tool_call_id: None,
447 name: None,
448 metadata: meta,
449 },
450
451 BusMessage::ToolResponse {
453 request_id,
454 tool_name,
455 result,
456 success,
457 ..
458 } => TrainingRecord {
459 role: "tool".into(),
460 content: Some(if *success {
461 result.clone()
462 } else {
463 format!("[ERROR] {result}")
464 }),
465 tool_calls: None,
466 tool_call_id: Some(request_id.clone()),
467 name: Some(tool_name.clone()),
468 metadata: meta,
469 },
470
471 BusMessage::ToolOutputFull {
473 agent_id,
474 tool_name,
475 output,
476 success,
477 step,
478 } => TrainingRecord {
479 role: "tool".into(),
480 content: Some(if *success {
481 format!("[step {step}, agent {agent_id}] {output}")
482 } else {
483 format!("[step {step}, agent {agent_id}, ERROR] {output}")
484 }),
485 tool_calls: None,
486 tool_call_id: None,
487 name: Some(tool_name.clone()),
488 metadata: meta,
489 },
490
491 BusMessage::Heartbeat { .. } => TrainingRecord {
493 role: "system".into(),
494 content: None,
495 tool_calls: None,
496 tool_call_id: None,
497 name: None,
498 metadata: meta,
499 },
500
501 BusMessage::RalphLearning {
503 prd_id,
504 story_id,
505 iteration,
506 learnings,
507 context,
508 } => TrainingRecord {
509 role: "system".into(),
510 content: Some(format!(
511 "Ralph learning (PRD {prd_id}, story {story_id}, iter {iteration}):\n{}\nContext: {}",
512 learnings
513 .iter()
514 .map(|l| format!("- {l}"))
515 .collect::<Vec<_>>()
516 .join("\n"),
517 serde_json::to_string(context).unwrap_or_default()
518 )),
519 tool_calls: None,
520 tool_call_id: None,
521 name: None,
522 metadata: meta,
523 },
524
525 BusMessage::RalphHandoff {
526 prd_id,
527 from_story,
528 to_story,
529 context,
530 progress_summary,
531 } => TrainingRecord {
532 role: "system".into(),
533 content: Some(format!(
534 "Ralph handoff (PRD {prd_id}): {from_story} → {to_story}\nSummary: {progress_summary}\nContext: {}",
535 serde_json::to_string(context).unwrap_or_default()
536 )),
537 tool_calls: None,
538 tool_call_id: None,
539 name: None,
540 metadata: meta,
541 },
542
543 BusMessage::RalphProgress {
544 prd_id,
545 passed,
546 total,
547 iteration,
548 status,
549 } => TrainingRecord {
550 role: "system".into(),
551 content: Some(format!(
552 "Ralph progress (PRD {prd_id}): {passed}/{total} stories passed, iter {iteration}, status: {status}"
553 )),
554 tool_calls: None,
555 tool_call_id: None,
556 name: None,
557 metadata: meta,
558 },
559
560 BusMessage::AgentThinking {
562 agent_id,
563 thinking,
564 step,
565 } => TrainingRecord {
566 role: "assistant".into(),
567 content: Some(format!("<thinking>\n{thinking}\n</thinking>")),
568 tool_calls: None,
569 tool_call_id: None,
570 name: Some(format!("reasoning.{agent_id}.step_{step}")),
571 metadata: meta,
572 },
573
574 BusMessage::VoiceSessionStarted {
576 room_name,
577 agent_id,
578 voice_id,
579 } => TrainingRecord {
580 role: "system".into(),
581 content: Some(format!(
582 "Voice session started: room={room_name}, agent={agent_id}, voice={voice_id}"
583 )),
584 tool_calls: None,
585 tool_call_id: None,
586 name: None,
587 metadata: meta,
588 },
589
590 BusMessage::VoiceTranscript {
591 room_name,
592 text,
593 role,
594 is_final,
595 } => TrainingRecord {
596 role: if role == "user" {
597 "user".into()
598 } else {
599 "assistant".into()
600 },
601 content: Some(format!(
602 "[voice:{room_name}{}] {text}",
603 if *is_final { " final" } else { "" }
604 )),
605 tool_calls: None,
606 tool_call_id: None,
607 name: None,
608 metadata: meta,
609 },
610
611 BusMessage::VoiceAgentStateChanged { room_name, state } => TrainingRecord {
612 role: "system".into(),
613 content: Some(format!("Voice agent state: room={room_name} → {state}")),
614 tool_calls: None,
615 tool_call_id: None,
616 name: None,
617 metadata: meta,
618 },
619
620 BusMessage::VoiceSessionEnded { room_name, reason } => TrainingRecord {
621 role: "system".into(),
622 content: Some(format!(
623 "Voice session ended: room={room_name}, reason={reason}"
624 )),
625 tool_calls: None,
626 tool_call_id: None,
627 name: None,
628 metadata: meta,
629 },
630 }
631}
632
633type ToolGroupKey = (String, usize, Option<String>);
634
635fn collect_training_records(envelopes: &[BusEnvelope]) -> Vec<TrainingRecord> {
636 let mut grouped_records: BTreeMap<ToolGroupKey, Vec<TrainingRecord>> = BTreeMap::new();
637 let mut passthrough_records = Vec::new();
638
639 for env in envelopes {
640 if matches!(env.message, BusMessage::Heartbeat { .. }) {
641 continue;
642 }
643 let record = envelope_to_training_record(env);
644 if let Some(step) = record.metadata.step
645 && matches!(
646 env.message,
647 BusMessage::ToolRequest { .. } | BusMessage::ToolResponse { .. }
648 )
649 {
650 grouped_records
651 .entry((
652 record.metadata.sender_id.clone(),
653 step,
654 record.metadata.correlation_id.clone(),
655 ))
656 .or_default()
657 .push(record);
658 continue;
659 }
660 passthrough_records.push(record);
661 }
662
663 let mut records = passthrough_records;
664 for (_key, mut group) in grouped_records {
665 append_training_group(&mut group, &mut records);
666 }
667 records
668}
669
670fn append_training_group(records: &mut [TrainingRecord], output: &mut Vec<TrainingRecord>) {
671 if records.is_empty() {
672 return;
673 }
674
675 let assistant_prefix_len = records
676 .iter()
677 .take_while(|record| {
678 record.role == "assistant"
679 && record
680 .tool_calls
681 .as_ref()
682 .is_some_and(|calls| calls.len() == 1)
683 })
684 .count();
685 let tool_suffix_len = records[assistant_prefix_len..]
686 .iter()
687 .take_while(|record| record.role == "tool" && record.tool_call_id.is_some())
688 .count();
689 let can_merge = assistant_prefix_len > 0
690 && tool_suffix_len > 0
691 && assistant_prefix_len + tool_suffix_len == records.len();
692
693 if can_merge {
694 let mut merged = records[0].clone();
695 let mut tool_calls = Vec::with_capacity(assistant_prefix_len);
696 let mut envelope_ids = Vec::with_capacity(assistant_prefix_len);
697 let mut contents = Vec::new();
698
699 for record in records.iter().take(assistant_prefix_len) {
700 envelope_ids.push(record.metadata.envelope_id.clone());
701 if let Some(content) = record
702 .content
703 .as_ref()
704 .filter(|content| !content.is_empty())
705 {
706 contents.push(content.clone());
707 }
708 if let Some(mut calls) = record.tool_calls.clone() {
709 tool_calls.append(&mut calls);
710 }
711 }
712
713 merged.tool_calls = Some(tool_calls);
714 merged.content = if contents.is_empty() {
715 None
716 } else {
717 Some(contents.join("\n"))
718 };
719 merged.metadata.bus_kind = "tool_request_batch".into();
720 merged.metadata.envelope_id = envelope_ids.join(",");
721
722 output.push(merged);
723 output.extend(records.iter().skip(assistant_prefix_len).cloned());
724 return;
725 }
726
727 output.extend(records.iter().cloned());
728}
729
730fn serialize_training_records(records: &[TrainingRecord]) -> Vec<String> {
731 records
732 .iter()
733 .filter_map(|record| serde_json::to_string(record).ok())
734 .collect()
735}
736
737fn build_s3_key(prefix: &str, now: chrono::DateTime<Utc>) -> String {
738 let prefix = if prefix.is_empty() {
739 String::new()
740 } else if prefix.ends_with('/') {
741 prefix.to_string()
742 } else {
743 format!("{prefix}/")
744 };
745 let date_path = now.format("%Y/%m/%d/%H").to_string();
746 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
747 let uuid = uuid::Uuid::new_v4();
748 format!("{prefix}v2/{date_path}/batch_{timestamp}_{uuid}.jsonl")
749}
750
751fn bus_message_kind(msg: &BusMessage) -> String {
753 serde_json::to_value(msg)
754 .ok()
755 .and_then(|v| v.get("kind").and_then(|k| k.as_str()).map(String::from))
756 .unwrap_or_else(|| "unknown".into())
757}
758
759fn parts_to_text(parts: &[Part]) -> String {
761 parts
762 .iter()
763 .map(|p| match p {
764 Part::Text { text } => text.as_str(),
765 Part::Data { .. } => "<<data>>",
766 Part::File { .. } => "<<file>>",
767 })
768 .collect::<Vec<_>>()
769 .join("\n")
770}
771
772pub struct BusS3Sink {
776 #[allow(dead_code)]
777 bus: Arc<AgentBus>,
778 client: MinioClient,
779 config: BusS3SinkConfig,
780 rx: broadcast::Receiver<BusEnvelope>,
781}
782
783impl BusS3Sink {
784 #[allow(dead_code)]
785 pub async fn new(
787 bus: Arc<AgentBus>,
788 endpoint: &str,
789 access_key: &str,
790 secret_key: &str,
791 bucket: &str,
792 prefix: &str,
793 ) -> Result<Self> {
794 let config = BusS3SinkConfig {
795 endpoint: endpoint.to_string(),
796 access_key: access_key.to_string(),
797 secret_key: secret_key.to_string(),
798 bucket: bucket.to_string(),
799 prefix: prefix.to_string(),
800 batch_size: 100,
801 flush_interval_secs: 30,
802 secure: endpoint.starts_with("https"),
803 ignore_cert: false,
804 };
805
806 Self::from_config(bus, config).await
807 }
808
809 pub async fn from_config(bus: Arc<AgentBus>, config: BusS3SinkConfig) -> Result<Self> {
811 let endpoint = normalize_endpoint(&config.endpoint, config.secure);
812
813 let base_url: BaseUrl = endpoint.parse().context("Invalid MinIO endpoint URL")?;
814
815 let static_provider = StaticProvider::new(&config.access_key, &config.secret_key, None);
816
817 let client = MinioClientBuilder::new(base_url)
818 .provider(Some(Box::new(static_provider)))
819 .ignore_cert_check(Some(config.ignore_cert))
820 .build()?;
821
822 let rx = bus.tx.subscribe();
823
824 Ok(Self {
825 bus,
826 client,
827 config,
828 rx,
829 })
830 }
831
832 #[allow(dead_code)]
833 pub async fn from_env(bus: Arc<AgentBus>) -> Result<Self> {
835 let config = BusS3SinkConfig::from_env()?;
836 Self::from_config(bus, config).await
837 }
838
839 pub async fn ensure_bucket(&self) -> Result<()> {
841 match self.client.bucket_exists(&self.config.bucket).send().await {
842 Ok(resp) if resp.exists => {
843 debug!(bucket = %self.config.bucket, "S3 bucket exists");
844 }
845 Ok(_) => {
846 info!(bucket = %self.config.bucket, "Creating S3 bucket");
847 match self.client.create_bucket(&self.config.bucket).send().await {
848 Ok(_) => {}
849 Err(e) => {
850 let err_text = e.to_string();
851 if !err_text.contains("BucketAlreadyOwnedByYou")
852 && !err_text.contains("BucketAlreadyExists")
853 {
854 return Err(anyhow::anyhow!("Failed to create bucket: {err_text}"));
855 }
856 debug!(bucket = %self.config.bucket, "Bucket already exists");
857 }
858 }
859 }
860 Err(e) => {
861 debug!(error = %e, bucket = %self.config.bucket, "Bucket check returned error (may already exist)");
862 }
863 }
864 Ok(())
865 }
866
867 pub async fn run(mut self) -> Result<()> {
869 self.ensure_bucket().await?;
870
871 info!(
872 bucket = %self.config.bucket,
873 prefix = %self.config.prefix,
874 batch_size = self.config.batch_size,
875 flush_secs = self.config.flush_interval_secs,
876 "Bus S3 sink started (JSONL training record format)"
877 );
878
879 let mut batch: Vec<BusEnvelope> = Vec::with_capacity(self.config.batch_size);
880 let mut batch_start: Option<String> = None;
881 let mut flush_interval = tokio::time::interval(std::time::Duration::from_secs(
882 self.config.flush_interval_secs,
883 ));
884
885 loop {
886 tokio::select! {
887 result = self.rx.recv() => {
888 match result {
889 Ok(envelope) => {
890 if batch_start.is_none() {
891 batch_start = Some(envelope.timestamp.to_rfc3339());
892 }
893 batch.push(envelope);
894
895 if batch.len() >= self.config.batch_size {
896 if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
897 error!(error = %e, "Failed to flush batch");
898 }
899 task::yield_now().await;
901 }
902 }
903 Err(broadcast::error::RecvError::Lagged(n)) => {
904 warn!(skipped = n, "Bus S3 sink lagged, some messages dropped");
905 }
906 Err(broadcast::error::RecvError::Closed) => {
907 info!("Bus channel closed, shutting down S3 sink");
908 if !batch.is_empty()
909 && let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
910 error!(error = %e, "Failed to flush final batch");
911 }
912 return Ok(());
913 }
914 }
915 }
916
917 _ = flush_interval.tick() => {
918 if !batch.is_empty() {
919 if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
920 error!(error = %e, "Failed to flush batch on interval");
921 }
922 task::yield_now().await;
924 }
925 }
926 }
927 }
928 }
929
930 async fn flush_batch(
936 &self,
937 batch: &mut Vec<BusEnvelope>,
938 batch_start: &mut Option<String>,
939 ) -> Result<()> {
940 if batch.is_empty() {
941 return Ok(());
942 }
943
944 let _start_time = batch_start
945 .take()
946 .unwrap_or_else(|| Utc::now().to_rfc3339());
947 let envelopes = std::mem::take(batch);
948
949 let records = collect_training_records(&envelopes);
954 let lines = serialize_training_records(&records);
955 if lines.is_empty() {
956 return Ok(());
957 }
958
959 let count = lines.len();
960 let jsonl = lines.join("\n");
961
962 let now = Utc::now();
964 let s3_key = build_s3_key(&self.config.prefix, now);
965
966 let content = ObjectContent::from(jsonl.into_bytes());
967
968 match self
969 .client
970 .put_object_content(&self.config.bucket, &s3_key, content)
971 .send()
972 .await
973 {
974 Ok(_) => {
975 info!(
976 bucket = %self.config.bucket,
977 key = %s3_key,
978 records = count,
979 "Uploaded training records to S3"
980 );
981 }
982 Err(e) => {
983 error!(
984 bucket = %self.config.bucket,
985 key = %s3_key,
986 error = %e,
987 "Failed to upload training records to S3"
988 );
989 return Err(anyhow::anyhow!("S3 upload failed: {e}"));
990 }
991 }
992
993 Ok(())
994 }
995
996 #[allow(dead_code)]
997 pub fn bucket(&self) -> &str {
999 &self.config.bucket
1000 }
1001
1002 #[allow(dead_code)]
1003 pub fn prefix(&self) -> &str {
1005 &self.config.prefix
1006 }
1007}
1008
1009fn normalize_endpoint(endpoint: &str, secure: bool) -> String {
1011 let endpoint = endpoint.trim_end_matches('/');
1012
1013 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
1014 endpoint.to_string()
1015 } else if secure {
1016 format!("https://{endpoint}")
1017 } else {
1018 format!("http://{endpoint}")
1019 }
1020}
1021
1022pub fn spawn_bus_s3_sink(bus: Arc<AgentBus>) -> tokio::task::JoinHandle<()> {
1030 tokio::spawn(async move {
1031 match BusS3SinkConfig::from_env_or_vault().await {
1032 Ok(config) => match BusS3Sink::from_config(bus, config).await {
1033 Ok(sink) => {
1034 if let Err(e) = sink.run().await {
1035 error!(error = %e, "Bus S3 sink failed");
1036 }
1037 }
1038 Err(e) => {
1039 error!(error = %e, "Bus S3 sink failed to initialize");
1040 }
1041 },
1042 Err(e) => {
1043 warn!(
1044 error = %e,
1045 "Bus S3 sink not configured - set MINIO_*/CODETETHER_CHAT_SYNC_MINIO_* env vars or configure chat-sync-minio in Vault"
1046 );
1047 }
1048 }
1049 })
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::*;
1055 use serde_json::Value;
1056
1057 #[test]
1058 fn test_normalize_endpoint() {
1059 assert_eq!(
1060 normalize_endpoint("localhost:9000", false),
1061 "http://localhost:9000"
1062 );
1063 assert_eq!(
1064 normalize_endpoint("localhost:9000", true),
1065 "https://localhost:9000"
1066 );
1067 assert_eq!(
1068 normalize_endpoint("http://localhost:9000/", false),
1069 "http://localhost:9000"
1070 );
1071 assert_eq!(
1072 normalize_endpoint("https://minio.example.com/", true),
1073 "https://minio.example.com"
1074 );
1075 }
1076
1077 #[test]
1078 fn test_config_defaults() {
1079 let config = BusS3SinkConfig {
1080 endpoint: "http://localhost:9000".to_string(),
1081 access_key: "key".to_string(),
1082 secret_key: "secret".to_string(),
1083 bucket: "test".to_string(),
1084 prefix: default_prefix(),
1085 batch_size: default_batch_size(),
1086 flush_interval_secs: default_flush_interval_secs(),
1087 secure: false,
1088 ignore_cert: false,
1089 };
1090
1091 assert_eq!(config.prefix, "training/");
1092 assert_eq!(config.batch_size, 100);
1093 assert_eq!(config.flush_interval_secs, 30);
1094 }
1095
1096 #[test]
1097 fn test_training_record_tool_request() {
1098 let env = BusEnvelope {
1099 id: "env-1".into(),
1100 topic: "tools.read_file".into(),
1101 sender_id: "agent-0".into(),
1102 correlation_id: Some("corr-1".into()),
1103 timestamp: Utc::now(),
1104 message: BusMessage::ToolRequest {
1105 request_id: "req-1".into(),
1106 agent_id: "agent-0".into(),
1107 tool_name: "read_file".into(),
1108 arguments: serde_json::json!({"path": "/src/main.rs"}),
1109 step: 1,
1110 },
1111 };
1112
1113 let record = envelope_to_training_record(&env);
1114 assert_eq!(record.role, "assistant");
1115 assert!(record.content.is_none());
1116 let calls = record.tool_calls.unwrap();
1117 assert_eq!(calls.len(), 1);
1118 assert_eq!(calls[0].function.name, "read_file");
1119 assert_eq!(calls[0].call_type, "function");
1120 assert_eq!(record.metadata.bus_kind, "tool_request");
1121 }
1122
1123 #[test]
1124 fn test_training_record_tool_response() {
1125 let env = BusEnvelope {
1126 id: "env-2".into(),
1127 topic: "tools.read_file".into(),
1128 sender_id: "agent-0".into(),
1129 correlation_id: Some("corr-1".into()),
1130 timestamp: Utc::now(),
1131 message: BusMessage::ToolResponse {
1132 request_id: "req-1".into(),
1133 agent_id: "agent-0".into(),
1134 tool_name: "read_file".into(),
1135 result: "fn main() {}".into(),
1136 success: true,
1137 step: 1,
1138 },
1139 };
1140
1141 let record = envelope_to_training_record(&env);
1142 assert_eq!(record.role, "tool");
1143 assert_eq!(record.content.as_deref(), Some("fn main() {}"));
1144 assert_eq!(record.tool_call_id.as_deref(), Some("req-1"));
1145 assert_eq!(record.name.as_deref(), Some("read_file"));
1146 assert_eq!(record.metadata.bus_kind, "tool_response");
1147 }
1148
1149 #[test]
1150 fn test_training_record_agent_message() {
1151 let env = BusEnvelope {
1152 id: "env-3".into(),
1153 topic: "agent.planner".into(),
1154 sender_id: "coder".into(),
1155 correlation_id: None,
1156 timestamp: Utc::now(),
1157 message: BusMessage::AgentMessage {
1158 from: "coder".into(),
1159 to: "planner".into(),
1160 parts: vec![Part::Text {
1161 text: "I fixed the bug".into(),
1162 }],
1163 },
1164 };
1165
1166 let record = envelope_to_training_record(&env);
1167 assert_eq!(record.role, "assistant");
1168 assert!(
1169 record
1170 .content
1171 .as_deref()
1172 .unwrap()
1173 .contains("I fixed the bug")
1174 );
1175 assert!(
1176 record
1177 .content
1178 .as_deref()
1179 .unwrap()
1180 .contains("[coder → planner]")
1181 );
1182 }
1183
1184 #[test]
1185 fn test_heartbeat_skipped_role() {
1186 let env = BusEnvelope {
1187 id: "env-4".into(),
1188 topic: "broadcast".into(),
1189 sender_id: "agent-0".into(),
1190 correlation_id: None,
1191 timestamp: Utc::now(),
1192 message: BusMessage::Heartbeat {
1193 agent_id: "agent-0".into(),
1194 status: "ok".into(),
1195 },
1196 };
1197
1198 let record = envelope_to_training_record(&env);
1199 assert_eq!(record.role, "system");
1201 assert!(record.content.is_none());
1202 }
1203
1204 #[test]
1205 fn test_tool_grouping_keeps_different_correlation_ids_separate() {
1206 let envelopes = vec![
1207 tool_request_envelope("env-1", "req-1", Some("turn-1")),
1208 tool_response_envelope("env-2", "req-1", Some("turn-1")),
1209 tool_request_envelope("env-3", "req-2", Some("turn-2")),
1210 tool_response_envelope("env-4", "req-2", Some("turn-2")),
1211 ];
1212
1213 let records = collect_training_records(&envelopes);
1214 let assistant_batches: Vec<_> = records
1215 .iter()
1216 .filter(|record| record.metadata.bus_kind == "tool_request_batch")
1217 .collect();
1218
1219 assert_eq!(records.len(), 4);
1220 assert_eq!(assistant_batches.len(), 2);
1221 assert_eq!(
1222 assistant_batches[0].metadata.correlation_id.as_deref(),
1223 Some("turn-1")
1224 );
1225 assert_eq!(
1226 assistant_batches[1].metadata.correlation_id.as_deref(),
1227 Some("turn-2")
1228 );
1229 assert_eq!(assistant_batches[0].tool_calls.as_ref().unwrap().len(), 1);
1230 assert_eq!(assistant_batches[1].tool_calls.as_ref().unwrap().len(), 1);
1231 }
1232
1233 #[test]
1234 fn test_tool_grouping_keeps_legacy_none_correlation_compatible() {
1235 let envelopes = vec![
1236 tool_request_envelope("env-1", "req-1", None),
1237 tool_request_envelope("env-2", "req-2", None),
1238 tool_response_envelope("env-3", "req-1", None),
1239 tool_response_envelope("env-4", "req-2", None),
1240 ];
1241
1242 let records = collect_training_records(&envelopes);
1243 let assistant_batches: Vec<_> = records
1244 .iter()
1245 .filter(|record| record.metadata.bus_kind == "tool_request_batch")
1246 .collect();
1247
1248 assert_eq!(records.len(), 3);
1249 assert_eq!(assistant_batches.len(), 1);
1250 assert!(assistant_batches[0].metadata.correlation_id.is_none());
1251 assert_eq!(assistant_batches[0].tool_calls.as_ref().unwrap().len(), 2);
1252 }
1253
1254 fn tool_request_envelope(
1255 envelope_id: &str,
1256 request_id: &str,
1257 correlation_id: Option<&str>,
1258 ) -> BusEnvelope {
1259 BusEnvelope {
1260 id: envelope_id.into(),
1261 topic: "tools.read_file".into(),
1262 sender_id: "agent-0".into(),
1263 correlation_id: correlation_id.map(str::to_string),
1264 timestamp: Utc::now(),
1265 message: BusMessage::ToolRequest {
1266 request_id: request_id.into(),
1267 agent_id: "agent-0".into(),
1268 tool_name: "read_file".into(),
1269 arguments: Value::Null,
1270 step: 1,
1271 },
1272 }
1273 }
1274
1275 fn tool_response_envelope(
1276 envelope_id: &str,
1277 request_id: &str,
1278 correlation_id: Option<&str>,
1279 ) -> BusEnvelope {
1280 BusEnvelope {
1281 id: envelope_id.into(),
1282 topic: "tools.read_file".into(),
1283 sender_id: "agent-0".into(),
1284 correlation_id: correlation_id.map(str::to_string),
1285 timestamp: Utc::now(),
1286 message: BusMessage::ToolResponse {
1287 request_id: request_id.into(),
1288 agent_id: "agent-0".into(),
1289 tool_name: "read_file".into(),
1290 result: "ok".into(),
1291 success: true,
1292 step: 1,
1293 },
1294 }
1295 }
1296}