1use crate::{
2 call::ActiveCallType,
3 config::{CallRecordConfig, S3Vendor},
4};
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use futures::stream::{FuturesUnordered, StreamExt};
8use object_store::{
9 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
10 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
11};
12use reqwest;
13use serde::{Deserialize, Serialize};
14use serde_with::skip_serializing_none;
15use std::{
16 collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
17 time::Instant,
18};
19use tokio::{fs::File, io::AsyncWriteExt};
20use tokio_util::sync::CancellationToken;
21use tracing::{info, warn};
22use voice_engine::CallOption;
23
24pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
25pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
26
27pub type FnSaveCallRecord = Arc<
28 Box<
29 dyn Fn(
30 CancellationToken,
31 Arc<dyn CallRecordFormatter>,
32 Arc<CallRecordConfig>,
33 CallRecord,
34 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
35 + Send
36 + Sync,
37 >,
38>;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub enum CallRecordEventType {
43 Event,
44 Command,
45 Sip,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct CallRecordEvent {
51 pub r#type: CallRecordEventType,
52 pub timestamp: u64,
53 pub content: String,
54}
55
56impl CallRecordEvent {
57 pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
58 let content = match serde_json::to_string(&obj) {
59 Ok(s) => s,
60 Err(_) => return,
61 };
62 let event = Self {
63 r#type,
64 timestamp: voice_engine::media::get_timestamp(),
65 content,
66 };
67 match serde_json::to_string(&event) {
68 Ok(line) => {
69 file.write_all(format!("{}\n", line).as_bytes()).await.ok();
70 }
71 Err(_) => {}
72 }
73 }
74}
75
76#[skip_serializing_none]
77#[derive(Debug, Clone, Serialize, Deserialize, Default)]
78#[serde(rename_all = "camelCase")]
79pub struct CallRecord {
80 pub call_type: ActiveCallType,
81 pub option: Option<CallOption>,
82 pub call_id: String,
83 pub start_time: DateTime<Utc>,
84 pub ring_time: Option<DateTime<Utc>>,
85 pub answer_time: Option<DateTime<Utc>>,
86 pub end_time: DateTime<Utc>,
87 pub caller: String,
88 pub callee: String,
89 pub status_code: u16,
90 pub hangup_reason: Option<CallRecordHangupReason>,
91 #[serde(skip_serializing_if = "Vec::is_empty")]
92 #[serde(default)]
93 pub hangup_messages: Vec<CallRecordHangupMessage>,
94 #[serde(skip_serializing_if = "Vec::is_empty")]
95 #[serde(default)]
96 pub recorder: Vec<CallRecordMedia>,
97 pub extras: Option<HashMap<String, serde_json::Value>>,
98 pub dump_event_file: Option<String>,
99 pub refer_callrecord: Option<Box<CallRecord>>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct CallRecordMedia {
105 pub track_id: String,
106 pub path: String,
107 pub size: u64,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub extra: Option<HashMap<String, serde_json::Value>>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub enum CallRecordHangupReason {
115 ByCaller,
116 ByCallee,
117 ByRefer,
118 BySystem,
119 Autohangup,
120 NoAnswer,
121 NoBalance,
122 AnswerMachine,
123 ServerUnavailable,
124 Canceled,
125 Rejected,
126 Failed,
127 Other(String),
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct CallRecordHangupMessage {
133 pub code: u16,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub reason: Option<String>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub target: Option<String>,
138}
139
140impl FromStr for CallRecordHangupReason {
141 type Err = anyhow::Error;
142
143 fn from_str(s: &str) -> Result<Self, Self::Err> {
144 match s.to_lowercase().as_str() {
145 "caller" => Ok(Self::ByCaller),
146 "callee" => Ok(Self::ByCallee),
147 "refer" => Ok(Self::ByRefer),
148 "system" => Ok(Self::BySystem),
149 "autohangup" => Ok(Self::Autohangup),
150 "noAnswer" => Ok(Self::NoAnswer),
151 "noBalance" => Ok(Self::NoBalance),
152 "answerMachine" => Ok(Self::AnswerMachine),
153 "serverUnavailable" => Ok(Self::ServerUnavailable),
154 "canceled" => Ok(Self::Canceled),
155 "rejected" => Ok(Self::Rejected),
156 "failed" => Ok(Self::Failed),
157 _ => Ok(Self::Other(s.to_string())),
158 }
159 }
160}
161impl ToString for CallRecordHangupReason {
162 fn to_string(&self) -> String {
163 match self {
164 Self::ByCaller => "caller".to_string(),
165 Self::ByCallee => "callee".to_string(),
166 Self::ByRefer => "refer".to_string(),
167 Self::BySystem => "system".to_string(),
168 Self::Autohangup => "autohangup".to_string(),
169 Self::NoAnswer => "noAnswer".to_string(),
170 Self::NoBalance => "noBalance".to_string(),
171 Self::AnswerMachine => "answerMachine".to_string(),
172 Self::ServerUnavailable => "serverUnavailable".to_string(),
173 Self::Canceled => "canceled".to_string(),
174 Self::Rejected => "rejected".to_string(),
175 Self::Failed => "failed".to_string(),
176 Self::Other(s) => s.to_string(),
177 }
178 }
179}
180
181pub fn default_cdr_file_name(record: &CallRecord) -> String {
182 format!(
183 "{}_{}.json",
184 record.start_time.format("%Y%m%d-%H%M%S"),
185 record.call_id
186 )
187}
188
189pub trait CallRecordFormatter: Send + Sync {
190 fn format(&self, record: &CallRecord) -> Result<String> {
191 Ok(serde_json::to_string(record)?)
192 }
193 fn format_file_name(&self, record: &CallRecord) -> String;
194 fn format_dump_events_path(&self, record: &CallRecord) -> String;
195 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
196}
197
198pub struct DefaultCallRecordFormatter {
199 pub root: String,
200}
201
202impl Default for DefaultCallRecordFormatter {
203 fn default() -> Self {
204 Self {
205 root: "./config/cdr".to_string(),
206 }
207 }
208}
209
210impl DefaultCallRecordFormatter {
211 pub fn new_with_config(config: &CallRecordConfig) -> Self {
212 let root = match config {
213 CallRecordConfig::Local { root } => root.clone(),
214 CallRecordConfig::S3 { root, .. } => root.clone(),
215 _ => "./config/cdr".to_string(),
216 };
217 Self { root }
218 }
219}
220
221impl CallRecordFormatter for DefaultCallRecordFormatter {
222 fn format_file_name(&self, record: &CallRecord) -> String {
223 let trimmed_root = self.root.trim_end_matches('/');
224 let file_name = default_cdr_file_name(record);
225 if trimmed_root.is_empty() {
226 file_name
227 } else {
228 format!(
229 "{}/{}/{}",
230 trimmed_root,
231 record.start_time.format("%Y%m%d"),
232 file_name
233 )
234 }
235 }
236
237 fn format_dump_events_path(&self, record: &CallRecord) -> String {
238 format!(
239 "{}/{}/{}.jsonl",
240 self.root.trim_end_matches('/'),
241 record.start_time.format("%Y%m%d"),
242 record.call_id
243 )
244 }
245
246 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
247 let file_name = Path::new(&media.path)
248 .file_name()
249 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
250 .to_string_lossy()
251 .to_string();
252
253 format!(
254 "{}/{}/{}_{}_{}",
255 self.root.trim_end_matches('/'),
256 record.start_time.format("%Y%m%d"),
257 record.call_id,
258 media.track_id,
259 file_name
260 )
261 }
262}
263
264pub fn build_object_store_from_s3(
265 vendor: &S3Vendor,
266 bucket: &str,
267 region: &str,
268 access_key: &str,
269 secret_key: &str,
270 endpoint: &str,
271) -> Result<Arc<dyn ObjectStore>> {
272 let store: Arc<dyn ObjectStore> = match vendor {
273 S3Vendor::AWS => {
274 let builder = AmazonS3Builder::new()
275 .with_bucket_name(bucket)
276 .with_region(region)
277 .with_access_key_id(access_key)
278 .with_secret_access_key(secret_key);
279
280 let instance = if !endpoint.is_empty() {
281 builder.with_endpoint(endpoint).build()?
282 } else {
283 builder.build()?
284 };
285 Arc::new(instance)
286 }
287 S3Vendor::GCP => {
288 let instance = GoogleCloudStorageBuilder::new()
289 .with_bucket_name(bucket)
290 .with_service_account_key(secret_key)
291 .build()?;
292 Arc::new(instance)
293 }
294 S3Vendor::Azure => {
295 let instance = MicrosoftAzureBuilder::new()
296 .with_container_name(bucket)
297 .with_account(access_key)
298 .with_access_key(secret_key)
299 .build()?;
300 Arc::new(instance)
301 }
302 S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
303 let instance = AmazonS3Builder::new()
304 .with_bucket_name(bucket)
305 .with_region(region)
306 .with_access_key_id(access_key)
307 .with_secret_access_key(secret_key)
308 .with_endpoint(endpoint)
309 .with_virtual_hosted_style_request(false)
310 .build()?;
311 Arc::new(instance)
312 }
313 };
314
315 Ok(store)
316}
317
318pub struct CallRecordManager {
319 pub max_concurrent: usize,
320 pub sender: CallRecordSender,
321 config: Arc<CallRecordConfig>,
322 cancel_token: CancellationToken,
323 receiver: CallRecordReceiver,
324 saver_fn: FnSaveCallRecord,
325 formatter: Arc<dyn CallRecordFormatter>,
326}
327
328pub struct CallRecordManagerBuilder {
329 pub cancel_token: Option<CancellationToken>,
330 pub config: Option<CallRecordConfig>,
331 pub max_concurrent: Option<usize>,
332 saver_fn: Option<FnSaveCallRecord>,
333 formatter: Option<Arc<dyn CallRecordFormatter>>,
334}
335
336impl CallRecordManagerBuilder {
337 pub fn new() -> Self {
338 Self {
339 cancel_token: None,
340 config: None,
341 max_concurrent: None,
342 saver_fn: None,
343 formatter: None,
344 }
345 }
346
347 pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
348 self.cancel_token = Some(cancel_token);
349 self
350 }
351
352 pub fn with_config(mut self, config: CallRecordConfig) -> Self {
353 self.config = Some(config);
354 self
355 }
356
357 pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
358 self.saver_fn = Some(saver);
359 self
360 }
361
362 pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
363 self.formatter = Some(formatter);
364 self
365 }
366
367 pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
368 self.max_concurrent = Some(max_concurrent);
369 self
370 }
371
372 pub fn build(self) -> CallRecordManager {
373 let cancel_token = self.cancel_token.unwrap_or_default();
374 let config = Arc::new(self.config.unwrap_or_default());
375 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
376 let saver_fn = self
377 .saver_fn
378 .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
379 let formatter = self
380 .formatter
381 .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
382 let max_concurrent = self.max_concurrent.unwrap_or(64);
383
384 match config.as_ref() {
385 CallRecordConfig::Local { root } => {
386 if !Path::new(&root).exists() {
387 match std::fs::create_dir_all(&root) {
388 Ok(_) => {
389 info!("CallRecordManager created directory: {}", root);
390 }
391 Err(e) => {
392 warn!("CallRecordManager failed to create directory: {}", e);
393 }
394 }
395 }
396 }
397 _ => {}
398 }
399
400 CallRecordManager {
401 max_concurrent,
402 cancel_token,
403 sender,
404 receiver,
405 config,
406 saver_fn,
407 formatter,
408 }
409 }
410}
411
412impl CallRecordManager {
413 fn default_saver(
414 _cancel_token: CancellationToken,
415 formatter: Arc<dyn CallRecordFormatter>,
416 config: Arc<CallRecordConfig>,
417 record: CallRecord,
418 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
419 Box::pin(async move {
420 let mut record = record;
421 let start_time = Instant::now();
422 let result = match config.as_ref() {
423 CallRecordConfig::Local { .. } => {
424 Self::save_local_record(formatter.clone(), &mut record).await
425 }
426 CallRecordConfig::S3 {
427 vendor,
428 bucket,
429 region,
430 access_key,
431 secret_key,
432 endpoint,
433 with_media,
434 keep_media_copy,
435 ..
436 } => {
437 Self::save_with_s3_like(
438 formatter.clone(),
439 vendor,
440 bucket,
441 region,
442 access_key,
443 secret_key,
444 endpoint,
445 with_media,
446 keep_media_copy,
447 &record,
448 )
449 .await
450 }
451 CallRecordConfig::Http {
452 url,
453 headers,
454 with_media,
455 keep_media_copy,
456 } => {
457 Self::save_with_http(
458 formatter.clone(),
459 url,
460 headers,
461 with_media,
462 keep_media_copy,
463 &record,
464 )
465 .await
466 }
467 };
468 let file_name = match result {
469 Ok(file_name) => file_name,
470 Err(e) => {
471 warn!("Failed to save call record: {}", e);
472 return Err(e);
473 }
474 };
475 let elapsed = start_time.elapsed();
476 info!(
477 ?elapsed,
478 call_id = record.call_id,
479 file_name,
480 "CallRecordManager saved"
481 );
482 Ok(())
483 })
484 }
485
486 async fn save_local_record(
487 formatter: Arc<dyn CallRecordFormatter>,
488 record: &mut CallRecord,
489 ) -> Result<String> {
490 let file_content = formatter.format(record)?;
491 let file_name = formatter.format_file_name(record);
492
493 if let Some(parent) = Path::new(&file_name).parent() {
495 tokio::fs::create_dir_all(parent).await.map_err(|e| {
496 anyhow::anyhow!("Failed to create parent directory for {}: {}", file_name, e)
497 })?;
498 }
499
500 let mut file = File::create(&file_name).await.map_err(|e| {
501 anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
502 })?;
503 file.write_all(file_content.as_bytes()).await?;
504 file.flush().await?;
505 Ok(file_name.to_string())
506 }
507
508 pub async fn save_with_http(
509 formatter: Arc<dyn CallRecordFormatter>,
510 url: &String,
511 headers: &Option<HashMap<String, String>>,
512 with_media: &Option<bool>,
513 keep_media_copy: &Option<bool>,
514 record: &CallRecord,
515 ) -> Result<String> {
516 let client = reqwest::Client::new();
517 let call_log_json = formatter.format(record)?;
519 let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
521
522 if with_media.unwrap_or(false) {
524 for media in &record.recorder {
525 if std::path::Path::new(&media.path).exists() {
526 match tokio::fs::read(&media.path).await {
527 Ok(file_content) => {
528 let file_name = std::path::Path::new(&media.path)
529 .file_name()
530 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
531 .to_string_lossy()
532 .to_string();
533
534 let part = match reqwest::multipart::Part::bytes(file_content)
535 .file_name(file_name.clone())
536 .mime_str("application/octet-stream")
537 {
538 Ok(part) => part,
539 Err(_) => {
540 reqwest::multipart::Part::bytes(
542 tokio::fs::read(&media.path).await?,
543 )
544 .file_name(file_name)
545 }
546 };
547
548 form = form.part(format!("media_{}", media.track_id), part);
549 }
550 Err(e) => {
551 warn!("Failed to read media file {}: {}", media.path, e);
552 }
553 }
554 }
555 }
556 if let Some(dump_events_file) = &record.dump_event_file {
557 if Path::new(&dump_events_file).exists() {
558 let file_name = Path::new(&dump_events_file)
559 .file_name()
560 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
561 .to_string_lossy()
562 .to_string();
563 match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
564 .file_name(file_name.clone())
565 .mime_str("application/octet-stream")
566 {
567 Ok(part) => {
568 form = form.part(format!("dump_events_{}", file_name), part);
569 }
570 Err(_) => {}
571 };
572 }
573 }
574 }
575 let mut request = client.post(url).multipart(form);
576 if let Some(headers_map) = headers {
577 for (key, value) in headers_map {
578 request = request.header(key, value);
579 }
580 }
581 let response = request.send().await?;
582 if response.status().is_success() {
583 let response_text = response.text().await.unwrap_or_default();
584
585 if keep_media_copy.unwrap_or(false) {
586 for media in &record.recorder {
587 let p = Path::new(&media.path);
588 if p.exists() {
589 tokio::fs::remove_file(p).await.ok();
590 }
591 }
592 }
593 Ok(format!("HTTP upload successful: {}", response_text))
594 } else {
595 Err(anyhow::anyhow!(
596 "HTTP upload failed with status: {} - {}",
597 response.status(),
598 response.text().await.unwrap_or_default()
599 ))
600 }
601 }
602
603 pub async fn save_with_s3_like(
604 formatter: Arc<dyn CallRecordFormatter>,
605 vendor: &S3Vendor,
606 bucket: &String,
607 region: &String,
608 access_key: &String,
609 secret_key: &String,
610 endpoint: &String,
611 with_media: &Option<bool>,
612 keep_media_copy: &Option<bool>,
613 record: &CallRecord,
614 ) -> Result<String> {
615 let start_time = Instant::now();
616 let object_store =
617 build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
618
619 let call_log_json = formatter.format(record)?;
621 let filename = formatter.format_file_name(record);
623 let local_files = vec![filename.clone()];
624 let json_path = ObjectPath::from(filename);
625 let buf_size = call_log_json.len();
626 match object_store.put(&json_path, call_log_json.into()).await {
627 Ok(_) => {
628 info!(
629 elapsed = start_time.elapsed().as_secs_f64(),
630 %json_path,
631 buf_size,
632 "upload call record"
633 );
634 }
635 Err(e) => {
636 warn!(
637 %json_path,
638 "failed to upload call record: {}", e
639 );
640 }
641 }
642 if with_media.unwrap_or(false) {
644 let mut media_files = vec![];
645 for media in &record.recorder {
646 if Path::new(&media.path).exists() {
647 let media_path = ObjectPath::from(formatter.format_media_path(record, media));
648 media_files.push((media.path.clone(), media_path));
649 }
650 }
651 if let Some(dump_events_file) = &record.dump_event_file {
652 if Path::new(&dump_events_file).exists() {
653 let dump_events_path =
654 ObjectPath::from(formatter.format_dump_events_path(record));
655 media_files.push((dump_events_file.clone(), dump_events_path));
656 }
657 }
658 for (path, media_path) in &media_files {
659 let start_time = Instant::now();
660 let file_content = match tokio::fs::read(path).await {
661 Ok(file_content) => file_content,
662 Err(e) => {
663 warn!("failed to read media file {}: {}", path, e);
664 continue;
665 }
666 };
667 let buf_size = file_content.len();
668 match object_store.put(media_path, file_content.into()).await {
669 Ok(_) => {
670 info!(
671 elapsed = start_time.elapsed().as_secs_f64(),
672 %media_path,
673 buf_size,
674 "upload media file"
675 );
676 }
677 Err(e) => {
678 warn!(%media_path,"failed to upload media file: {}", e);
679 }
680 }
681 }
682 }
683 if !keep_media_copy.unwrap_or(false) {
685 for media in &record.recorder {
686 let p = Path::new(&media.path);
687 if p.exists() {
688 tokio::fs::remove_file(p).await.ok();
689 }
690 }
691 for file_name in &local_files {
692 let p = Path::new(file_name);
693 if p.exists() {
694 tokio::fs::remove_file(p).await.ok();
695 }
696 }
697 }
698
699 Ok(format!(
700 "{}/{}",
701 endpoint.trim_end_matches('/'),
702 json_path.to_string().trim_start_matches('/')
703 ))
704 }
705
706 pub async fn serve(&mut self) {
707 let token = self.cancel_token.clone();
708 info!("CallRecordManager serving");
709 tokio::select! {
710 _ = token.cancelled() => {}
711 _ = self.recv_loop() => {}
712 }
713 info!("CallRecordManager served");
714 }
715
716 async fn recv_loop(&mut self) -> Result<()> {
717 let mut futures = FuturesUnordered::new();
718 loop {
719 let limit = self.max_concurrent - futures.len();
720 if limit == 0 {
721 if let Some(_) = futures.next().await {}
722 continue;
723 }
724 let mut buffer = Vec::with_capacity(limit);
725 if self.receiver.recv_many(&mut buffer, limit).await == 0 {
726 break;
727 }
728
729 for record in buffer {
730 let cancel_token_ref = self.cancel_token.clone();
731 let save_fn_ref = self.saver_fn.clone();
732 let config_ref = self.config.clone();
733 let formatter_ref = self.formatter.clone();
734
735 futures.push(async move {
736 if let Err(e) =
737 save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
738 {
739 warn!("Failed to save call record: {}", e);
740 }
741 });
742 }
743 while let Some(_) = futures.next().await {}
744 }
745 Ok(())
746 }
747}