#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
use crate::config::{SipFlowClusterNode, SipFlowConfig, SipFlowEngine, SipFlowSubdirs};
use crate::sipflow::backend::create_backend;
use crate::sipflow::{SipFlowBackend, SipFlowItem, SipFlowMsgType};
use chrono::{Local, TimeZone};
fn make_rtp_item(leg: i32) -> SipFlowItem {
SipFlowItem {
timestamp: chrono::Utc::now().timestamp_micros() as u64,
seq: 0,
leg: Some(leg),
msg_type: SipFlowMsgType::Rtp,
src_addr: "127.0.0.1:5000".to_string(),
dst_addr: "127.0.0.1:5001".to_string(),
payload: bytes::Bytes::from(vec![0u8; 20]),
}
}
#[tokio::test]
async fn test_local_backend_flush_no_error() {
let dir = TempDir::new().unwrap();
let cfg = SipFlowConfig::Local {
root: dir.path().to_string_lossy().to_string(),
subdirs: SipFlowSubdirs::None,
flush_count: 10000,
flush_interval_secs: 3600,
id_cache_size: 64,
engine: SipFlowEngine::Sqlite,
ttl_secs: None,
memtable_size_mb: 64,
block_cache_capacity_mb: 128,
upload: None,
};
let backend = create_backend(&cfg).expect("backend creation should succeed");
backend.flush().await.expect("flush should not error");
}
#[tokio::test]
async fn test_local_backend_flush_writes_to_disk() {
let dir = TempDir::new().unwrap();
let root = dir.path().to_string_lossy().to_string();
let cfg = SipFlowConfig::Local {
root: root.clone(),
subdirs: SipFlowSubdirs::None,
flush_count: 10000,
flush_interval_secs: 3600,
id_cache_size: 64,
engine: SipFlowEngine::Sqlite,
ttl_secs: None,
memtable_size_mb: 64,
block_cache_capacity_mb: 128,
upload: None,
};
let backend: Arc<dyn SipFlowBackend> =
Arc::from(create_backend(&cfg).expect("backend creation should succeed"));
for _ in 0..5 {
backend
.record("test-call-1", make_rtp_item(0))
.expect("record should succeed");
}
backend.flush().await.expect("flush should not error");
let db_files: Vec<PathBuf> = std::fs::read_dir(&root)
.unwrap()
.flatten()
.map(|e| e.path())
.filter(|p| p.extension().map(|e| e == "db").unwrap_or(false))
.collect();
assert!(
!db_files.is_empty(),
"expected at least one .db file after flush; got none in {root}"
);
}
#[tokio::test]
async fn test_remote_backend_legacy_format() {
let cfg = SipFlowConfig::Remote {
nodes: vec![],
udp_addr: Some("127.0.0.1:3000".to_string()),
http_addr: Some("http://127.0.0.1:3001".to_string()),
timeout_secs: 10,
upload: None,
};
let backend = create_backend(&cfg);
assert!(backend.is_ok(), "legacy format should create backend");
}
#[tokio::test]
async fn test_remote_backend_domain_udp_addr() {
let cfg = SipFlowConfig::Remote {
nodes: vec![],
udp_addr: Some("localhost:3000".to_string()),
http_addr: Some("http://localhost:3001".to_string()),
timeout_secs: 10,
upload: None,
};
let backend = create_backend(&cfg);
assert!(backend.is_ok(), "domain UDP address should create backend");
}
#[tokio::test]
async fn test_remote_backend_multi_node_format() {
let cfg = SipFlowConfig::Remote {
nodes: vec![
SipFlowClusterNode {
udp: "192.168.1.1:3000".to_string(),
http: "http://192.168.1.1:3001".to_string(),
},
SipFlowClusterNode {
udp: "192.168.1.2:3000".to_string(),
http: "http://192.168.1.2:3001".to_string(),
},
],
udp_addr: None,
http_addr: None,
timeout_secs: 10,
upload: None,
};
let backend = create_backend(&cfg);
assert!(backend.is_ok(), "multi-node format should create backend");
}
#[test]
fn test_remote_backend_missing_config() {
let cfg = SipFlowConfig::Remote {
nodes: vec![],
udp_addr: None,
http_addr: None,
timeout_secs: 10,
upload: None,
};
let result = create_backend(&cfg);
assert!(
result.is_err(),
"expected error when neither nodes nor udp_addr/http_addr provided"
);
}
#[tokio::test]
async fn test_flowdb_backend_default_engine() {
let dir = TempDir::new().unwrap();
let cfg = SipFlowConfig::Local {
root: dir.path().to_string_lossy().to_string(),
subdirs: SipFlowSubdirs::None,
flush_count: 1000,
flush_interval_secs: 5,
id_cache_size: 1024,
engine: SipFlowEngine::FlowDb,
ttl_secs: None,
memtable_size_mb: 1,
block_cache_capacity_mb: 16,
upload: None,
};
let backend = create_backend(&cfg).expect("flowdb backend creation should succeed");
backend.flush().await.expect("flush should not error");
}
#[tokio::test]
async fn test_flowdb_backend_factory_record_and_query() {
use crate::sipflow::{SipFlowBackend, SipFlowItem, SipFlowMsgType};
use bytes::Bytes;
let dir = TempDir::new().unwrap();
let cfg = SipFlowConfig::Local {
root: dir.path().to_string_lossy().to_string(),
subdirs: SipFlowSubdirs::None,
flush_count: 1000,
flush_interval_secs: 5,
id_cache_size: 1024,
engine: SipFlowEngine::FlowDb,
ttl_secs: None,
memtable_size_mb: 1,
block_cache_capacity_mb: 16,
upload: None,
};
let backend: Arc<dyn SipFlowBackend> =
Arc::from(create_backend(&cfg).expect("backend creation should succeed"));
let base = chrono::Utc::now().timestamp_micros();
let item = SipFlowItem {
timestamp: base as u64,
seq: 0,
leg: None,
msg_type: SipFlowMsgType::Sip,
src_addr: "127.0.0.1:5060".to_string(),
dst_addr: "127.0.0.2:5060".to_string(),
payload: Bytes::from_static(b"INVITE sip:test SIP/2.0\r\nCall-ID: factory-test\r\n"),
};
backend
.record("factory-test", item)
.expect("record should succeed");
backend.flush().await.expect("flush should succeed");
let items = backend
.query_flow(
"factory-test",
Local.timestamp_micros(base - 1).single().expect("valid dt"),
Local.timestamp_micros(base + 1).single().expect("valid dt"),
)
.await
.expect("query should succeed");
assert_eq!(items.len(), 1);
assert!(items[0].payload.starts_with(b"INVITE"));
}
#[test]
fn test_config_parse_flowdb_engine() {
let toml_str = r#"
type = "local"
root = "/var/sipflow"
engine = "flowdb"
ttl_secs = 3600
memtable_size_mb = 32
block_cache_capacity_mb = 64
"#;
let cfg: crate::config::SipFlowConfig =
toml::from_str(toml_str).expect("should parse flowdb config");
match cfg {
crate::config::SipFlowConfig::Local {
engine,
ttl_secs,
memtable_size_mb,
block_cache_capacity_mb,
..
} => {
assert_eq!(engine, SipFlowEngine::FlowDb);
assert_eq!(ttl_secs, Some(3600));
assert_eq!(memtable_size_mb, 32);
assert_eq!(block_cache_capacity_mb, 64);
}
_ => panic!("expected Local config"),
}
}
#[test]
fn test_config_parse_sqlite_engine() {
let toml_str = r#"
type = "local"
root = "/var/sipflow"
engine = "sqlite"
"#;
let cfg: crate::config::SipFlowConfig =
toml::from_str(toml_str).expect("should parse sqlite config");
match cfg {
crate::config::SipFlowConfig::Local { engine, .. } => {
assert_eq!(engine, SipFlowEngine::Sqlite);
}
_ => panic!("expected Local config"),
}
}
#[test]
fn test_config_default_engine_is_sqlite() {
let toml_str = r#"
type = "local"
root = "/var/sipflow"
"#;
let cfg: crate::config::SipFlowConfig =
toml::from_str(toml_str).expect("should parse default config");
match cfg {
crate::config::SipFlowConfig::Local { engine, .. } => {
assert_eq!(engine, SipFlowEngine::Sqlite);
}
_ => panic!("expected Local config"),
}
}
}