#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
use crate::config::{SipFlowClusterNode, SipFlowConfig, SipFlowSubdirs};
use crate::sipflow::backend::create_backend;
use crate::sipflow::{SipFlowBackend, SipFlowItem, SipFlowMsgType};
fn make_rtp_item(leg: &str) -> SipFlowItem {
SipFlowItem {
timestamp: chrono::Utc::now().timestamp_micros() as u64,
seq: 0,
msg_type: SipFlowMsgType::Rtp,
src_addr: format!("{}_127.0.0.1:5000", leg),
dst_addr: "127.0.0.1:5001".to_string(),
payload: bytes::Bytes::from(vec![0u8; 20]),
}
}
#[tokio::test]
async fn test_local_backend_flush_no_error() {
let dir = TempDir::new().unwrap();
let cfg = SipFlowConfig::Local {
root: dir.path().to_string_lossy().to_string(),
subdirs: SipFlowSubdirs::None,
flush_count: 10000,
flush_interval_secs: 3600,
id_cache_size: 64,
upload: None,
};
let backend = create_backend(&cfg).expect("backend creation should succeed");
backend.flush().await.expect("flush should not error");
}
#[tokio::test]
async fn test_local_backend_flush_writes_to_disk() {
let dir = TempDir::new().unwrap();
let root = dir.path().to_string_lossy().to_string();
let cfg = SipFlowConfig::Local {
root: root.clone(),
subdirs: SipFlowSubdirs::None,
flush_count: 10000,
flush_interval_secs: 3600,
id_cache_size: 64,
upload: None,
};
let backend: Arc<dyn SipFlowBackend> =
Arc::from(create_backend(&cfg).expect("backend creation should succeed"));
for _ in 0..5 {
backend
.record("test-call-1", make_rtp_item("LegA"))
.expect("record should succeed");
}
backend.flush().await.expect("flush should not error");
let db_files: Vec<PathBuf> = std::fs::read_dir(&root)
.unwrap()
.flatten()
.map(|e| e.path())
.filter(|p| p.extension().map(|e| e == "db").unwrap_or(false))
.collect();
assert!(
!db_files.is_empty(),
"expected at least one .db file after flush; got none in {root}"
);
}
#[tokio::test]
async fn test_remote_backend_legacy_format() {
let cfg = SipFlowConfig::Remote {
nodes: vec![],
udp_addr: Some("127.0.0.1:3000".to_string()),
http_addr: Some("http://127.0.0.1:3001".to_string()),
timeout_secs: 10,
upload: None,
};
let backend = create_backend(&cfg);
assert!(backend.is_ok(), "legacy format should create backend");
}
#[tokio::test]
async fn test_remote_backend_multi_node_format() {
let cfg = SipFlowConfig::Remote {
nodes: vec![
SipFlowClusterNode {
udp: "192.168.1.1:3000".to_string(),
http: "http://192.168.1.1:3001".to_string(),
},
SipFlowClusterNode {
udp: "192.168.1.2:3000".to_string(),
http: "http://192.168.1.2:3001".to_string(),
},
],
udp_addr: None,
http_addr: None,
timeout_secs: 10,
upload: None,
};
let backend = create_backend(&cfg);
assert!(backend.is_ok(), "multi-node format should create backend");
}
#[test]
fn test_remote_backend_missing_config() {
let cfg = SipFlowConfig::Remote {
nodes: vec![],
udp_addr: None,
http_addr: None,
timeout_secs: 10,
upload: None,
};
let result = create_backend(&cfg);
assert!(
result.is_err(),
"expected error when neither nodes nor udp_addr/http_addr provided"
);
}
}