1use super::{AgentBus, BusEnvelope, BusMessage};
34use crate::a2a::types::Part;
35use crate::secrets;
36use anyhow::{Context, Result};
37use chrono::Utc;
38use minio::s3::builders::ObjectContent;
39use minio::s3::creds::StaticProvider;
40use minio::s3::http::BaseUrl;
41use minio::s3::types::S3Api;
42use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
43use serde::{Deserialize, Serialize};
44use std::sync::Arc;
45use tokio::sync::broadcast;
46use tokio::task;
47use tracing::{debug, error, info, warn};
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct BusS3SinkConfig {
52 pub endpoint: String,
54 pub access_key: String,
56 pub secret_key: String,
58 pub bucket: String,
60 #[serde(default = "default_prefix")]
62 pub prefix: String,
63 #[serde(default = "default_batch_size")]
65 pub batch_size: usize,
66 #[serde(default = "default_flush_interval_secs")]
68 pub flush_interval_secs: u64,
69 #[serde(default)]
71 pub secure: bool,
72 #[serde(default)]
74 pub ignore_cert: bool,
75}
76
77fn default_prefix() -> String {
78 "training/".to_string()
79}
80
81fn default_batch_size() -> usize {
82 100
83}
84
85fn default_flush_interval_secs() -> u64 {
86 30
87}
88
89impl BusS3SinkConfig {
90 pub fn from_env() -> Result<Self> {
101 let endpoint = std::env::var("MINIO_ENDPOINT")
102 .or_else(|_| std::env::var("CODETETHER_BUS_S3_ENDPOINT"))
103 .context("MINIO_ENDPOINT or CODETETHER_BUS_S3_ENDPOINT required for bus S3 sink")?;
104
105 let access_key = std::env::var("MINIO_ACCESS_KEY")
106 .or_else(|_| std::env::var("CODETETHER_BUS_S3_ACCESS_KEY"))
107 .context("MINIO_ACCESS_KEY or CODETETHER_BUS_S3_ACCESS_KEY required")?;
108
109 let secret_key = std::env::var("MINIO_SECRET_KEY")
110 .or_else(|_| std::env::var("CODETETHER_BUS_S3_SECRET_KEY"))
111 .context("MINIO_SECRET_KEY or CODETETHER_BUS_S3_SECRET_KEY required")?;
112
113 Ok(Self {
114 endpoint,
115 access_key,
116 secret_key,
117 bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
118 .unwrap_or_else(|_| "codetether-training".to_string()),
119 prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
120 .unwrap_or_else(|_| "training/".to_string()),
121 batch_size: std::env::var("CODETETHER_BUS_S3_BATCH_SIZE")
122 .ok()
123 .and_then(|s| s.parse().ok())
124 .unwrap_or(100),
125 flush_interval_secs: std::env::var("CODETETHER_BUS_S3_FLUSH_SECS")
126 .ok()
127 .and_then(|s| s.parse().ok())
128 .unwrap_or(30),
129 secure: std::env::var("MINIO_SECURE")
130 .ok()
131 .map(|s| s.to_lowercase() == "true")
132 .unwrap_or(false),
133 ignore_cert: std::env::var("MINIO_IGNORE_CERT")
134 .ok()
135 .map(|s| s.to_lowercase() == "true")
136 .unwrap_or(false),
137 })
138 }
139
140 pub async fn from_env_or_vault() -> Result<Self> {
146 if let Ok(cfg) = Self::from_env() {
148 return Ok(cfg);
149 }
150
151 let endpoint = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ENDPOINT");
153 let access_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_ACCESS_KEY");
154 let secret_key = env_non_empty("CODETETHER_CHAT_SYNC_MINIO_SECRET_KEY");
155
156 if let (Some(ep), Some(ak), Some(sk)) =
157 (endpoint.clone(), access_key.clone(), secret_key.clone())
158 {
159 info!("Bus S3 sink using chat-sync env vars");
160 return Ok(Self {
161 endpoint: ep,
162 access_key: ak,
163 secret_key: sk,
164 bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
165 .unwrap_or_else(|_| "codetether-training".to_string()),
166 prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
167 .unwrap_or_else(|_| "training/".to_string()),
168 batch_size: 100,
169 flush_interval_secs: 30,
170 secure: false,
171 ignore_cert: false,
172 });
173 }
174
175 if let Some(secrets) = secrets::get_provider_secrets("chat-sync-minio").await {
177 let ep = secrets
178 .base_url
179 .clone()
180 .or_else(|| vault_extra_str(&secrets, &["endpoint", "minio_endpoint", "url"]))
181 .filter(|s| !s.is_empty());
182 let ak = vault_extra_str(
183 &secrets,
184 &["access_key", "access_key_id", "minio_access_key"],
185 )
186 .or_else(|| secrets.api_key.clone())
187 .filter(|s| !s.is_empty());
188 let sk = vault_extra_str(
189 &secrets,
190 &["secret_key", "secret_access_key", "minio_secret_key"],
191 )
192 .filter(|s| !s.is_empty());
193
194 if let (Some(ep), Some(ak), Some(sk)) = (ep, ak, sk) {
195 info!("Bus S3 sink using Vault chat-sync-minio credentials");
196 return Ok(Self {
197 endpoint: ep,
198 access_key: ak,
199 secret_key: sk,
200 bucket: std::env::var("CODETETHER_BUS_S3_BUCKET")
201 .unwrap_or_else(|_| "codetether-training".to_string()),
202 prefix: std::env::var("CODETETHER_BUS_S3_PREFIX")
203 .unwrap_or_else(|_| "training/".to_string()),
204 batch_size: 100,
205 flush_interval_secs: 30,
206 secure: false,
207 ignore_cert: false,
208 });
209 }
210 }
211
212 anyhow::bail!(
213 "No MinIO credentials found. Set MINIO_ENDPOINT/MINIO_ACCESS_KEY/MINIO_SECRET_KEY, \
214 CODETETHER_CHAT_SYNC_MINIO_* env vars, or configure chat-sync-minio in Vault."
215 )
216 }
217}
218
219fn env_non_empty(key: &str) -> Option<String> {
221 std::env::var(key).ok().filter(|s| !s.is_empty())
222}
223
224fn vault_extra_str(secrets: &secrets::ProviderSecrets, keys: &[&str]) -> Option<String> {
226 for key in keys {
227 if let Some(val) = secrets.extra.get(*key) {
228 if let Some(s) = val.as_str() {
229 if !s.is_empty() {
230 return Some(s.to_string());
231 }
232 }
233 }
234 }
235 None
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
246struct TrainingRecord {
247 role: String,
249 #[serde(skip_serializing_if = "Option::is_none")]
251 content: Option<String>,
252 #[serde(skip_serializing_if = "Option::is_none")]
254 tool_calls: Option<Vec<TrainingToolCall>>,
255 #[serde(skip_serializing_if = "Option::is_none")]
257 tool_call_id: Option<String>,
258 #[serde(skip_serializing_if = "Option::is_none")]
260 name: Option<String>,
261 metadata: TrainingMetadata,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267struct TrainingToolCall {
268 id: String,
270 #[serde(rename = "type")]
272 call_type: String,
273 function: TrainingFunction,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279struct TrainingFunction {
280 name: String,
281 arguments: String,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
286struct TrainingMetadata {
287 bus_kind: String,
289 envelope_id: String,
291 timestamp: String,
293 topic: String,
295 sender_id: String,
297 #[serde(skip_serializing_if = "Option::is_none")]
299 correlation_id: Option<String>,
300}
301
302fn envelope_to_training_record(env: &BusEnvelope) -> TrainingRecord {
304 let meta = TrainingMetadata {
305 bus_kind: bus_message_kind(&env.message),
306 envelope_id: env.id.clone(),
307 timestamp: env.timestamp.to_rfc3339(),
308 topic: env.topic.clone(),
309 sender_id: env.sender_id.clone(),
310 correlation_id: env.correlation_id.clone(),
311 };
312
313 match &env.message {
314 BusMessage::AgentReady {
316 agent_id,
317 capabilities,
318 } => TrainingRecord {
319 role: "system".into(),
320 content: Some(format!(
321 "Agent `{agent_id}` ready. Capabilities: {}",
322 capabilities.join(", ")
323 )),
324 tool_calls: None,
325 tool_call_id: None,
326 name: None,
327 metadata: meta,
328 },
329
330 BusMessage::AgentShutdown { agent_id } => TrainingRecord {
331 role: "system".into(),
332 content: Some(format!("Agent `{agent_id}` shutting down.")),
333 tool_calls: None,
334 tool_call_id: None,
335 name: None,
336 metadata: meta,
337 },
338
339 BusMessage::AgentMessage { from, to, parts } => {
341 let text = parts_to_text(parts);
342 TrainingRecord {
343 role: "assistant".into(),
344 content: Some(format!("[{from} → {to}] {text}")),
345 tool_calls: None,
346 tool_call_id: None,
347 name: None,
348 metadata: meta,
349 }
350 }
351
352 BusMessage::TaskUpdate {
354 task_id,
355 state,
356 message,
357 } => {
358 let msg = message.as_deref().unwrap_or("");
359 TrainingRecord {
360 role: "system".into(),
361 content: Some(format!("Task `{task_id}` → {state:?}. {msg}")),
362 tool_calls: None,
363 tool_call_id: None,
364 name: None,
365 metadata: meta,
366 }
367 }
368
369 BusMessage::ArtifactUpdate { task_id, artifact } => {
370 let artifact_text = parts_to_text(&artifact.parts);
371 TrainingRecord {
372 role: "system".into(),
373 content: Some(format!(
374 "Task `{task_id}` artifact `{}`: {artifact_text}",
375 artifact.artifact_id
376 )),
377 tool_calls: None,
378 tool_call_id: None,
379 name: None,
380 metadata: meta,
381 }
382 }
383
384 BusMessage::SharedResult { key, value, tags } => TrainingRecord {
386 role: "system".into(),
387 content: Some(format!(
388 "Shared result `{key}` [{}]: {}",
389 tags.join(", "),
390 serde_json::to_string(value).unwrap_or_default()
391 )),
392 tool_calls: None,
393 tool_call_id: None,
394 name: None,
395 metadata: meta,
396 },
397
398 BusMessage::ToolRequest {
400 request_id,
401 tool_name,
402 arguments,
403 ..
404 } => TrainingRecord {
405 role: "assistant".into(),
406 content: None,
407 tool_calls: Some(vec![TrainingToolCall {
408 id: request_id.clone(),
409 call_type: "function".into(),
410 function: TrainingFunction {
411 name: tool_name.clone(),
412 arguments: serde_json::to_string(arguments).unwrap_or_default(),
413 },
414 }]),
415 tool_call_id: None,
416 name: None,
417 metadata: meta,
418 },
419
420 BusMessage::ToolResponse {
422 request_id,
423 tool_name,
424 result,
425 success,
426 ..
427 } => TrainingRecord {
428 role: "tool".into(),
429 content: Some(if *success {
430 result.clone()
431 } else {
432 format!("[ERROR] {result}")
433 }),
434 tool_calls: None,
435 tool_call_id: Some(request_id.clone()),
436 name: Some(tool_name.clone()),
437 metadata: meta,
438 },
439
440 BusMessage::ToolOutputFull {
442 agent_id,
443 tool_name,
444 output,
445 success,
446 step,
447 } => TrainingRecord {
448 role: "tool".into(),
449 content: Some(if *success {
450 format!("[step {step}, agent {agent_id}] {output}")
451 } else {
452 format!("[step {step}, agent {agent_id}, ERROR] {output}")
453 }),
454 tool_calls: None,
455 tool_call_id: None,
456 name: Some(tool_name.clone()),
457 metadata: meta,
458 },
459
460 BusMessage::Heartbeat { .. } => TrainingRecord {
462 role: "system".into(),
463 content: None,
464 tool_calls: None,
465 tool_call_id: None,
466 name: None,
467 metadata: meta,
468 },
469
470 BusMessage::RalphLearning {
472 prd_id,
473 story_id,
474 iteration,
475 learnings,
476 context,
477 } => TrainingRecord {
478 role: "system".into(),
479 content: Some(format!(
480 "Ralph learning (PRD {prd_id}, story {story_id}, iter {iteration}):\n{}\nContext: {}",
481 learnings
482 .iter()
483 .map(|l| format!("- {l}"))
484 .collect::<Vec<_>>()
485 .join("\n"),
486 serde_json::to_string(context).unwrap_or_default()
487 )),
488 tool_calls: None,
489 tool_call_id: None,
490 name: None,
491 metadata: meta,
492 },
493
494 BusMessage::RalphHandoff {
495 prd_id,
496 from_story,
497 to_story,
498 context,
499 progress_summary,
500 } => TrainingRecord {
501 role: "system".into(),
502 content: Some(format!(
503 "Ralph handoff (PRD {prd_id}): {from_story} → {to_story}\nSummary: {progress_summary}\nContext: {}",
504 serde_json::to_string(context).unwrap_or_default()
505 )),
506 tool_calls: None,
507 tool_call_id: None,
508 name: None,
509 metadata: meta,
510 },
511
512 BusMessage::RalphProgress {
513 prd_id,
514 passed,
515 total,
516 iteration,
517 status,
518 } => TrainingRecord {
519 role: "system".into(),
520 content: Some(format!(
521 "Ralph progress (PRD {prd_id}): {passed}/{total} stories passed, iter {iteration}, status: {status}"
522 )),
523 tool_calls: None,
524 tool_call_id: None,
525 name: None,
526 metadata: meta,
527 },
528
529 BusMessage::AgentThinking {
531 agent_id,
532 thinking,
533 step,
534 } => TrainingRecord {
535 role: "assistant".into(),
536 content: Some(format!("<thinking>\n{thinking}\n</thinking>")),
537 tool_calls: None,
538 tool_call_id: None,
539 name: Some(format!("reasoning.{agent_id}.step_{step}")),
540 metadata: meta,
541 },
542
543 BusMessage::VoiceSessionStarted {
545 room_name,
546 agent_id,
547 voice_id,
548 } => TrainingRecord {
549 role: "system".into(),
550 content: Some(format!(
551 "Voice session started: room={room_name}, agent={agent_id}, voice={voice_id}"
552 )),
553 tool_calls: None,
554 tool_call_id: None,
555 name: None,
556 metadata: meta,
557 },
558
559 BusMessage::VoiceTranscript {
560 room_name,
561 text,
562 role,
563 is_final,
564 } => TrainingRecord {
565 role: if role == "user" {
566 "user".into()
567 } else {
568 "assistant".into()
569 },
570 content: Some(format!(
571 "[voice:{room_name}{}] {text}",
572 if *is_final { " final" } else { "" }
573 )),
574 tool_calls: None,
575 tool_call_id: None,
576 name: None,
577 metadata: meta,
578 },
579
580 BusMessage::VoiceAgentStateChanged { room_name, state } => TrainingRecord {
581 role: "system".into(),
582 content: Some(format!("Voice agent state: room={room_name} → {state}")),
583 tool_calls: None,
584 tool_call_id: None,
585 name: None,
586 metadata: meta,
587 },
588
589 BusMessage::VoiceSessionEnded { room_name, reason } => TrainingRecord {
590 role: "system".into(),
591 content: Some(format!(
592 "Voice session ended: room={room_name}, reason={reason}"
593 )),
594 tool_calls: None,
595 tool_call_id: None,
596 name: None,
597 metadata: meta,
598 },
599 }
600}
601
602fn bus_message_kind(msg: &BusMessage) -> String {
604 serde_json::to_value(msg)
605 .ok()
606 .and_then(|v| v.get("kind").and_then(|k| k.as_str()).map(String::from))
607 .unwrap_or_else(|| "unknown".into())
608}
609
610fn parts_to_text(parts: &[Part]) -> String {
612 parts
613 .iter()
614 .map(|p| match p {
615 Part::Text { text } => text.as_str(),
616 Part::Data { .. } => "<<data>>",
617 Part::File { .. } => "<<file>>",
618 })
619 .collect::<Vec<_>>()
620 .join("\n")
621}
622
623pub struct BusS3Sink {
627 #[allow(dead_code)]
628 bus: Arc<AgentBus>,
629 client: MinioClient,
630 config: BusS3SinkConfig,
631 rx: broadcast::Receiver<BusEnvelope>,
632}
633
634impl BusS3Sink {
635 pub async fn new(
637 bus: Arc<AgentBus>,
638 endpoint: &str,
639 access_key: &str,
640 secret_key: &str,
641 bucket: &str,
642 prefix: &str,
643 ) -> Result<Self> {
644 let config = BusS3SinkConfig {
645 endpoint: endpoint.to_string(),
646 access_key: access_key.to_string(),
647 secret_key: secret_key.to_string(),
648 bucket: bucket.to_string(),
649 prefix: prefix.to_string(),
650 batch_size: 100,
651 flush_interval_secs: 30,
652 secure: endpoint.starts_with("https"),
653 ignore_cert: false,
654 };
655
656 Self::from_config(bus, config).await
657 }
658
659 pub async fn from_config(bus: Arc<AgentBus>, config: BusS3SinkConfig) -> Result<Self> {
661 let endpoint = normalize_endpoint(&config.endpoint, config.secure);
662
663 let base_url: BaseUrl = endpoint.parse().context("Invalid MinIO endpoint URL")?;
664
665 let static_provider = StaticProvider::new(&config.access_key, &config.secret_key, None);
666
667 let client = MinioClientBuilder::new(base_url)
668 .provider(Some(Box::new(static_provider)))
669 .ignore_cert_check(Some(config.ignore_cert))
670 .build()?;
671
672 let rx = bus.tx.subscribe();
673
674 Ok(Self {
675 bus,
676 client,
677 config,
678 rx,
679 })
680 }
681
682 pub async fn from_env(bus: Arc<AgentBus>) -> Result<Self> {
684 let config = BusS3SinkConfig::from_env()?;
685 Self::from_config(bus, config).await
686 }
687
688 pub async fn ensure_bucket(&self) -> Result<()> {
690 match self.client.bucket_exists(&self.config.bucket).send().await {
691 Ok(resp) if resp.exists => {
692 debug!(bucket = %self.config.bucket, "S3 bucket exists");
693 }
694 Ok(_) => {
695 info!(bucket = %self.config.bucket, "Creating S3 bucket");
696 match self.client.create_bucket(&self.config.bucket).send().await {
697 Ok(_) => {}
698 Err(e) => {
699 let err_text = e.to_string();
700 if !err_text.contains("BucketAlreadyOwnedByYou")
701 && !err_text.contains("BucketAlreadyExists")
702 {
703 return Err(anyhow::anyhow!("Failed to create bucket: {err_text}"));
704 }
705 debug!(bucket = %self.config.bucket, "Bucket already exists");
706 }
707 }
708 }
709 Err(e) => {
710 debug!(error = %e, bucket = %self.config.bucket, "Bucket check returned error (may already exist)");
711 }
712 }
713 Ok(())
714 }
715
716 pub async fn run(mut self) -> Result<()> {
718 self.ensure_bucket().await?;
719
720 info!(
721 bucket = %self.config.bucket,
722 prefix = %self.config.prefix,
723 batch_size = self.config.batch_size,
724 flush_secs = self.config.flush_interval_secs,
725 "Bus S3 sink started (JSONL training record format)"
726 );
727
728 let mut batch: Vec<BusEnvelope> = Vec::with_capacity(self.config.batch_size);
729 let mut batch_start: Option<String> = None;
730 let mut flush_interval = tokio::time::interval(std::time::Duration::from_secs(
731 self.config.flush_interval_secs,
732 ));
733
734 loop {
735 tokio::select! {
736 result = self.rx.recv() => {
737 match result {
738 Ok(envelope) => {
739 if batch_start.is_none() {
740 batch_start = Some(envelope.timestamp.to_rfc3339());
741 }
742 batch.push(envelope);
743
744 if batch.len() >= self.config.batch_size {
745 if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
746 error!(error = %e, "Failed to flush batch");
747 }
748 task::yield_now().await;
750 }
751 }
752 Err(broadcast::error::RecvError::Lagged(n)) => {
753 warn!(skipped = n, "Bus S3 sink lagged, some messages dropped");
754 }
755 Err(broadcast::error::RecvError::Closed) => {
756 info!("Bus channel closed, shutting down S3 sink");
757 if !batch.is_empty() {
758 if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
759 error!(error = %e, "Failed to flush final batch");
760 }
761 }
762 return Ok(());
763 }
764 }
765 }
766
767 _ = flush_interval.tick() => {
768 if !batch.is_empty() {
769 if let Err(e) = self.flush_batch(&mut batch, &mut batch_start).await {
770 error!(error = %e, "Failed to flush batch on interval");
771 }
772 task::yield_now().await;
774 }
775 }
776 }
777 }
778 }
779
780 async fn flush_batch(
786 &self,
787 batch: &mut Vec<BusEnvelope>,
788 batch_start: &mut Option<String>,
789 ) -> Result<()> {
790 if batch.is_empty() {
791 return Ok(());
792 }
793
794 let _start_time = batch_start
795 .take()
796 .unwrap_or_else(|| Utc::now().to_rfc3339());
797 let envelopes = std::mem::take(batch);
798
799 let mut lines = Vec::with_capacity(envelopes.len());
801 for env in &envelopes {
802 if matches!(env.message, BusMessage::Heartbeat { .. }) {
803 continue;
804 }
805 let record = envelope_to_training_record(env);
806 if let Ok(line) = serde_json::to_string(&record) {
807 lines.push(line);
808 }
809 }
810
811 if lines.is_empty() {
812 return Ok(());
813 }
814
815 let count = lines.len();
816 let jsonl = lines.join("\n");
817
818 let now = Utc::now();
820 let date_path = now.format("%Y/%m/%d/%H").to_string();
821 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
822 let uuid = uuid::Uuid::new_v4();
823 let s3_key = format!(
824 "{}{}/batch_{}_{}.jsonl",
825 self.config.prefix, date_path, timestamp, uuid
826 );
827
828 let content = ObjectContent::from(jsonl.into_bytes());
829
830 match self
831 .client
832 .put_object_content(&self.config.bucket, &s3_key, content)
833 .send()
834 .await
835 {
836 Ok(_) => {
837 info!(
838 bucket = %self.config.bucket,
839 key = %s3_key,
840 records = count,
841 "Uploaded training records to S3"
842 );
843 }
844 Err(e) => {
845 error!(
846 bucket = %self.config.bucket,
847 key = %s3_key,
848 error = %e,
849 "Failed to upload training records to S3"
850 );
851 return Err(anyhow::anyhow!("S3 upload failed: {e}"));
852 }
853 }
854
855 Ok(())
856 }
857
858 pub fn bucket(&self) -> &str {
860 &self.config.bucket
861 }
862
863 pub fn prefix(&self) -> &str {
865 &self.config.prefix
866 }
867}
868
869fn normalize_endpoint(endpoint: &str, secure: bool) -> String {
871 let endpoint = endpoint.trim_end_matches('/');
872
873 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
874 endpoint.to_string()
875 } else if secure {
876 format!("https://{endpoint}")
877 } else {
878 format!("http://{endpoint}")
879 }
880}
881
882pub fn spawn_bus_s3_sink(bus: Arc<AgentBus>) -> tokio::task::JoinHandle<()> {
890 tokio::spawn(async move {
891 match BusS3SinkConfig::from_env_or_vault().await {
892 Ok(config) => match BusS3Sink::from_config(bus, config).await {
893 Ok(sink) => {
894 if let Err(e) = sink.run().await {
895 error!(error = %e, "Bus S3 sink failed");
896 }
897 }
898 Err(e) => {
899 error!(error = %e, "Bus S3 sink failed to initialize");
900 }
901 },
902 Err(e) => {
903 warn!(
904 error = %e,
905 "Bus S3 sink not configured - set MINIO_*/CODETETHER_CHAT_SYNC_MINIO_* env vars or configure chat-sync-minio in Vault"
906 );
907 }
908 }
909 })
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915
916 #[test]
917 fn test_normalize_endpoint() {
918 assert_eq!(
919 normalize_endpoint("localhost:9000", false),
920 "http://localhost:9000"
921 );
922 assert_eq!(
923 normalize_endpoint("localhost:9000", true),
924 "https://localhost:9000"
925 );
926 assert_eq!(
927 normalize_endpoint("http://localhost:9000/", false),
928 "http://localhost:9000"
929 );
930 assert_eq!(
931 normalize_endpoint("https://minio.example.com/", true),
932 "https://minio.example.com"
933 );
934 }
935
936 #[test]
937 fn test_config_defaults() {
938 let config = BusS3SinkConfig {
939 endpoint: "http://localhost:9000".to_string(),
940 access_key: "key".to_string(),
941 secret_key: "secret".to_string(),
942 bucket: "test".to_string(),
943 prefix: default_prefix(),
944 batch_size: default_batch_size(),
945 flush_interval_secs: default_flush_interval_secs(),
946 secure: false,
947 ignore_cert: false,
948 };
949
950 assert_eq!(config.prefix, "training/");
951 assert_eq!(config.batch_size, 100);
952 assert_eq!(config.flush_interval_secs, 30);
953 }
954
955 #[test]
956 fn test_training_record_tool_request() {
957 let env = BusEnvelope {
958 id: "env-1".into(),
959 topic: "tools.read_file".into(),
960 sender_id: "agent-0".into(),
961 correlation_id: Some("corr-1".into()),
962 timestamp: Utc::now(),
963 message: BusMessage::ToolRequest {
964 request_id: "req-1".into(),
965 agent_id: "agent-0".into(),
966 tool_name: "read_file".into(),
967 arguments: serde_json::json!({"path": "/src/main.rs"}),
968 },
969 };
970
971 let record = envelope_to_training_record(&env);
972 assert_eq!(record.role, "assistant");
973 assert!(record.content.is_none());
974 let calls = record.tool_calls.unwrap();
975 assert_eq!(calls.len(), 1);
976 assert_eq!(calls[0].function.name, "read_file");
977 assert_eq!(calls[0].call_type, "function");
978 assert_eq!(record.metadata.bus_kind, "tool_request");
979 }
980
981 #[test]
982 fn test_training_record_tool_response() {
983 let env = BusEnvelope {
984 id: "env-2".into(),
985 topic: "tools.read_file".into(),
986 sender_id: "agent-0".into(),
987 correlation_id: Some("corr-1".into()),
988 timestamp: Utc::now(),
989 message: BusMessage::ToolResponse {
990 request_id: "req-1".into(),
991 agent_id: "agent-0".into(),
992 tool_name: "read_file".into(),
993 result: "fn main() {}".into(),
994 success: true,
995 },
996 };
997
998 let record = envelope_to_training_record(&env);
999 assert_eq!(record.role, "tool");
1000 assert_eq!(record.content.as_deref(), Some("fn main() {}"));
1001 assert_eq!(record.tool_call_id.as_deref(), Some("req-1"));
1002 assert_eq!(record.name.as_deref(), Some("read_file"));
1003 assert_eq!(record.metadata.bus_kind, "tool_response");
1004 }
1005
1006 #[test]
1007 fn test_training_record_agent_message() {
1008 let env = BusEnvelope {
1009 id: "env-3".into(),
1010 topic: "agent.planner".into(),
1011 sender_id: "coder".into(),
1012 correlation_id: None,
1013 timestamp: Utc::now(),
1014 message: BusMessage::AgentMessage {
1015 from: "coder".into(),
1016 to: "planner".into(),
1017 parts: vec![Part::Text {
1018 text: "I fixed the bug".into(),
1019 }],
1020 },
1021 };
1022
1023 let record = envelope_to_training_record(&env);
1024 assert_eq!(record.role, "assistant");
1025 assert!(
1026 record
1027 .content
1028 .as_deref()
1029 .unwrap()
1030 .contains("I fixed the bug")
1031 );
1032 assert!(
1033 record
1034 .content
1035 .as_deref()
1036 .unwrap()
1037 .contains("[coder → planner]")
1038 );
1039 }
1040
1041 #[test]
1042 fn test_heartbeat_skipped_role() {
1043 let env = BusEnvelope {
1044 id: "env-4".into(),
1045 topic: "broadcast".into(),
1046 sender_id: "agent-0".into(),
1047 correlation_id: None,
1048 timestamp: Utc::now(),
1049 message: BusMessage::Heartbeat {
1050 agent_id: "agent-0".into(),
1051 status: "ok".into(),
1052 },
1053 };
1054
1055 let record = envelope_to_training_record(&env);
1056 assert_eq!(record.role, "system");
1058 assert!(record.content.is_none());
1059 }
1060}