1use crate::CallOption;
2use crate::{
3 call::ActiveCallType,
4 config::{CallRecordConfig, S3Vendor},
5};
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use futures::stream::{FuturesUnordered, StreamExt};
9use object_store::PutPayload;
10use object_store::{
11 ObjectStore, ObjectStoreExt, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
12 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
13};
14use reqwest;
15use serde::{Deserialize, Serialize};
16use serde_with::skip_serializing_none;
17use std::{
18 collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
19 time::Instant,
20};
21use tokio::{fs::File, io::AsyncWriteExt};
22use tokio_util::sync::CancellationToken;
23use tracing::{info, warn};
24
25pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
26pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
27
28pub type FnSaveCallRecord = Arc<
29 Box<
30 dyn Fn(
31 CancellationToken,
32 Arc<dyn CallRecordFormatter>,
33 Arc<CallRecordConfig>,
34 CallRecord,
35 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
36 + Send
37 + Sync,
38 >,
39>;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub enum CallRecordEventType {
44 Event,
45 Command,
46 Sip,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CallRecordEvent {
52 pub r#type: CallRecordEventType,
53 pub timestamp: u64,
54 pub content: String,
55}
56
57impl CallRecordEvent {
58 pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
59 let content = match serde_json::to_string(&obj) {
60 Ok(s) => s,
61 Err(_) => return,
62 };
63 let event = Self {
64 r#type,
65 timestamp: crate::media::get_timestamp(),
66 content,
67 };
68 match serde_json::to_string(&event) {
69 Ok(line) => {
70 file.write_all(format!("{}\n", line).as_bytes()).await.ok();
71 }
72 Err(_) => {}
73 }
74 }
75}
76
77#[skip_serializing_none]
78#[derive(Debug, Clone, Serialize, Deserialize, Default)]
79#[serde(rename_all = "camelCase")]
80pub struct CallRecord {
81 pub call_type: ActiveCallType,
82 pub option: Option<CallOption>,
83 pub call_id: String,
84 pub start_time: DateTime<Utc>,
85 pub ring_time: Option<DateTime<Utc>>,
86 pub answer_time: Option<DateTime<Utc>>,
87 pub end_time: DateTime<Utc>,
88 pub caller: String,
89 pub callee: String,
90 pub status_code: u16,
91 pub hangup_reason: Option<CallRecordHangupReason>,
92 #[serde(skip_serializing_if = "Vec::is_empty")]
93 #[serde(default)]
94 pub hangup_messages: Vec<CallRecordHangupMessage>,
95 #[serde(skip_serializing_if = "Vec::is_empty")]
96 #[serde(default)]
97 pub recorder: Vec<CallRecordMedia>,
98 pub extras: Option<HashMap<String, serde_json::Value>>,
99 pub dump_event_file: Option<String>,
100 pub refer_callrecord: Option<Box<CallRecord>>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct CallRecordMedia {
106 pub track_id: String,
107 pub path: String,
108 pub size: u64,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub extra: Option<HashMap<String, serde_json::Value>>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub enum CallRecordHangupReason {
116 ByCaller,
117 ByCallee,
118 ByRefer,
119 BySystem,
120 Autohangup,
121 InactivityTimeout,
122 NoAnswer,
123 NoBalance,
124 AnswerMachine,
125 ServerUnavailable,
126 Canceled,
127 Rejected,
128 Failed,
129 Other(String),
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "camelCase")]
134pub struct CallRecordHangupMessage {
135 pub code: u16,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub reason: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub target: Option<String>,
140}
141
142impl FromStr for CallRecordHangupReason {
143 type Err = anyhow::Error;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 match s.to_lowercase().as_str() {
147 "caller" => Ok(Self::ByCaller),
148 "callee" => Ok(Self::ByCallee),
149 "refer" => Ok(Self::ByRefer),
150 "system" => Ok(Self::BySystem),
151 "autohangup" => Ok(Self::Autohangup),
152 "inactivitytimeout" => Ok(Self::InactivityTimeout),
153 "noAnswer" => Ok(Self::NoAnswer),
154 "noBalance" => Ok(Self::NoBalance),
155 "answerMachine" => Ok(Self::AnswerMachine),
156 "serverUnavailable" => Ok(Self::ServerUnavailable),
157 "canceled" => Ok(Self::Canceled),
158 "rejected" => Ok(Self::Rejected),
159 "failed" => Ok(Self::Failed),
160 _ => Ok(Self::Other(s.to_string())),
161 }
162 }
163}
164impl ToString for CallRecordHangupReason {
165 fn to_string(&self) -> String {
166 match self {
167 Self::ByCaller => "caller".to_string(),
168 Self::ByCallee => "callee".to_string(),
169 Self::ByRefer => "refer".to_string(),
170 Self::BySystem => "system".to_string(),
171 Self::Autohangup => "autohangup".to_string(),
172 Self::InactivityTimeout => "inactivityTimeout".to_string(),
173 Self::NoAnswer => "noAnswer".to_string(),
174 Self::NoBalance => "noBalance".to_string(),
175 Self::AnswerMachine => "answerMachine".to_string(),
176 Self::ServerUnavailable => "serverUnavailable".to_string(),
177 Self::Canceled => "canceled".to_string(),
178 Self::Rejected => "rejected".to_string(),
179 Self::Failed => "failed".to_string(),
180 Self::Other(s) => s.to_string(),
181 }
182 }
183}
184
185pub fn default_cdr_file_name(record: &CallRecord) -> String {
186 format!(
187 "{}_{}.json",
188 record.start_time.format("%Y%m%d-%H%M%S"),
189 record.call_id
190 )
191}
192
193pub trait CallRecordFormatter: Send + Sync {
194 fn format(&self, record: &CallRecord) -> Result<String> {
195 Ok(serde_json::to_string(record)?)
196 }
197 fn format_file_name(&self, record: &CallRecord) -> String;
198 fn format_dump_events_path(&self, record: &CallRecord) -> String;
199 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
200}
201
202pub struct DefaultCallRecordFormatter {
203 pub root: String,
204}
205
206impl Default for DefaultCallRecordFormatter {
207 fn default() -> Self {
208 Self {
209 root: "./config/cdr".to_string(),
210 }
211 }
212}
213
214impl DefaultCallRecordFormatter {
215 pub fn new_with_config(config: &CallRecordConfig) -> Self {
216 let root = match config {
217 CallRecordConfig::Local { root } => root.clone(),
218 CallRecordConfig::S3 { root, .. } => root.clone(),
219 _ => "./config/cdr".to_string(),
220 };
221 Self { root }
222 }
223}
224
225impl CallRecordFormatter for DefaultCallRecordFormatter {
226 fn format_file_name(&self, record: &CallRecord) -> String {
227 let trimmed_root = self.root.trim_end_matches('/');
228 let file_name = default_cdr_file_name(record);
229 if trimmed_root.is_empty() {
230 file_name
231 } else {
232 format!(
233 "{}/{}/{}",
234 trimmed_root,
235 record.start_time.format("%Y%m%d"),
236 file_name
237 )
238 }
239 }
240
241 fn format_dump_events_path(&self, record: &CallRecord) -> String {
242 format!(
243 "{}/{}/{}.jsonl",
244 self.root.trim_end_matches('/'),
245 record.start_time.format("%Y%m%d"),
246 record.call_id
247 )
248 }
249
250 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
251 let file_name = Path::new(&media.path)
252 .file_name()
253 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
254 .to_string_lossy()
255 .to_string();
256
257 format!(
258 "{}/{}/{}_{}_{}",
259 self.root.trim_end_matches('/'),
260 record.start_time.format("%Y%m%d"),
261 record.call_id,
262 media.track_id,
263 file_name
264 )
265 }
266}
267
268pub fn build_object_store_from_s3(
269 vendor: &S3Vendor,
270 bucket: &str,
271 region: &str,
272 access_key: &str,
273 secret_key: &str,
274 endpoint: &str,
275) -> Result<Arc<dyn ObjectStore>> {
276 let store: Arc<dyn ObjectStore> = match vendor {
277 S3Vendor::AWS => {
278 let builder = AmazonS3Builder::new()
279 .with_bucket_name(bucket)
280 .with_region(region)
281 .with_access_key_id(access_key)
282 .with_secret_access_key(secret_key);
283
284 let instance = if !endpoint.is_empty() {
285 builder.with_endpoint(endpoint).build()?
286 } else {
287 builder.build()?
288 };
289 Arc::new(instance)
290 }
291 S3Vendor::GCP => {
292 let instance = GoogleCloudStorageBuilder::new()
293 .with_bucket_name(bucket)
294 .with_service_account_key(secret_key)
295 .build()?;
296 Arc::new(instance)
297 }
298 S3Vendor::Azure => {
299 let instance = MicrosoftAzureBuilder::new()
300 .with_container_name(bucket)
301 .with_account(access_key)
302 .with_access_key(secret_key)
303 .build()?;
304 Arc::new(instance)
305 }
306 S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
307 let instance = AmazonS3Builder::new()
308 .with_bucket_name(bucket)
309 .with_region(region)
310 .with_access_key_id(access_key)
311 .with_secret_access_key(secret_key)
312 .with_endpoint(endpoint)
313 .with_virtual_hosted_style_request(false)
314 .build()?;
315 Arc::new(instance)
316 }
317 };
318
319 Ok(store)
320}
321
322pub struct CallRecordManager {
323 pub max_concurrent: usize,
324 pub sender: CallRecordSender,
325 config: Arc<CallRecordConfig>,
326 cancel_token: CancellationToken,
327 receiver: CallRecordReceiver,
328 saver_fn: FnSaveCallRecord,
329 formatter: Arc<dyn CallRecordFormatter>,
330}
331
332pub struct CallRecordManagerBuilder {
333 pub cancel_token: Option<CancellationToken>,
334 pub config: Option<CallRecordConfig>,
335 pub max_concurrent: Option<usize>,
336 saver_fn: Option<FnSaveCallRecord>,
337 formatter: Option<Arc<dyn CallRecordFormatter>>,
338}
339
340impl CallRecordManagerBuilder {
341 pub fn new() -> Self {
342 Self {
343 cancel_token: None,
344 config: None,
345 max_concurrent: None,
346 saver_fn: None,
347 formatter: None,
348 }
349 }
350
351 pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
352 self.cancel_token = Some(cancel_token);
353 self
354 }
355
356 pub fn with_config(mut self, config: CallRecordConfig) -> Self {
357 self.config = Some(config);
358 self
359 }
360
361 pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
362 self.saver_fn = Some(saver);
363 self
364 }
365
366 pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
367 self.formatter = Some(formatter);
368 self
369 }
370
371 pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
372 self.max_concurrent = Some(max_concurrent);
373 self
374 }
375
376 pub fn build(self) -> CallRecordManager {
377 let cancel_token = self.cancel_token.unwrap_or_default();
378 let config = Arc::new(self.config.unwrap_or_default());
379 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
380 let saver_fn = self
381 .saver_fn
382 .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
383 let formatter = self
384 .formatter
385 .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
386 let max_concurrent = self.max_concurrent.unwrap_or(64);
387
388 match config.as_ref() {
389 CallRecordConfig::Local { root } => {
390 if !Path::new(&root).exists() {
391 match std::fs::create_dir_all(&root) {
392 Ok(_) => {
393 info!("CallRecordManager created directory: {}", root);
394 }
395 Err(e) => {
396 warn!("CallRecordManager failed to create directory: {}", e);
397 }
398 }
399 }
400 }
401 _ => {}
402 }
403
404 CallRecordManager {
405 max_concurrent,
406 cancel_token,
407 sender,
408 receiver,
409 config,
410 saver_fn,
411 formatter,
412 }
413 }
414}
415
416impl CallRecordManager {
417 fn default_saver(
418 _cancel_token: CancellationToken,
419 formatter: Arc<dyn CallRecordFormatter>,
420 config: Arc<CallRecordConfig>,
421 record: CallRecord,
422 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
423 Box::pin(async move {
424 let mut record = record;
425 let start_time = Instant::now();
426 let result = match config.as_ref() {
427 CallRecordConfig::Local { .. } => {
428 Self::save_local_record(formatter.clone(), &mut record).await
429 }
430 CallRecordConfig::S3 {
431 vendor,
432 bucket,
433 region,
434 access_key,
435 secret_key,
436 endpoint,
437 with_media,
438 keep_media_copy,
439 ..
440 } => {
441 Self::save_with_s3_like(
442 formatter.clone(),
443 vendor,
444 bucket,
445 region,
446 access_key,
447 secret_key,
448 endpoint,
449 with_media,
450 keep_media_copy,
451 &record,
452 )
453 .await
454 }
455 CallRecordConfig::Http {
456 url,
457 headers,
458 with_media,
459 keep_media_copy,
460 } => {
461 Self::save_with_http(
462 formatter.clone(),
463 url,
464 headers,
465 with_media,
466 keep_media_copy,
467 &record,
468 )
469 .await
470 }
471 };
472 let file_name = match result {
473 Ok(file_name) => file_name,
474 Err(e) => {
475 warn!("Failed to save call record: {}", e);
476 return Err(e);
477 }
478 };
479 let elapsed = start_time.elapsed();
480 info!(
481 ?elapsed,
482 call_id = record.call_id,
483 file_name,
484 "CallRecordManager saved"
485 );
486 Ok(())
487 })
488 }
489
490 async fn save_local_record(
491 formatter: Arc<dyn CallRecordFormatter>,
492 record: &mut CallRecord,
493 ) -> Result<String> {
494 let file_content = formatter.format(record)?;
495 let file_name = formatter.format_file_name(record);
496
497 if let Some(parent) = Path::new(&file_name).parent() {
499 tokio::fs::create_dir_all(parent).await.map_err(|e| {
500 anyhow::anyhow!("Failed to create parent directory for {}: {}", file_name, e)
501 })?;
502 }
503
504 let mut file = File::create(&file_name).await.map_err(|e| {
505 anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
506 })?;
507 file.write_all(file_content.as_bytes()).await?;
508 file.flush().await?;
509 Ok(file_name.to_string())
510 }
511
512 pub async fn save_with_http(
513 formatter: Arc<dyn CallRecordFormatter>,
514 url: &String,
515 headers: &Option<HashMap<String, String>>,
516 with_media: &Option<bool>,
517 keep_media_copy: &Option<bool>,
518 record: &CallRecord,
519 ) -> Result<String> {
520 let client = reqwest::Client::new();
521 let call_log_json = formatter.format(record)?;
523 let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
525
526 if with_media.unwrap_or(false) {
528 for media in &record.recorder {
529 if std::path::Path::new(&media.path).exists() {
530 match tokio::fs::read(&media.path).await {
531 Ok(file_content) => {
532 let file_name = std::path::Path::new(&media.path)
533 .file_name()
534 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
535 .to_string_lossy()
536 .to_string();
537
538 let part = match reqwest::multipart::Part::bytes(file_content)
539 .file_name(file_name.clone())
540 .mime_str("application/octet-stream")
541 {
542 Ok(part) => part,
543 Err(_) => {
544 reqwest::multipart::Part::bytes(
546 tokio::fs::read(&media.path).await?,
547 )
548 .file_name(file_name)
549 }
550 };
551
552 form = form.part(format!("media_{}", media.track_id), part);
553 }
554 Err(e) => {
555 warn!("Failed to read media file {}: {}", media.path, e);
556 }
557 }
558 }
559 }
560 if let Some(dump_events_file) = &record.dump_event_file {
561 if Path::new(&dump_events_file).exists() {
562 let file_name = Path::new(&dump_events_file)
563 .file_name()
564 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
565 .to_string_lossy()
566 .to_string();
567 match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
568 .file_name(file_name.clone())
569 .mime_str("application/octet-stream")
570 {
571 Ok(part) => {
572 form = form.part(format!("dump_events_{}", file_name), part);
573 }
574 Err(_) => {}
575 };
576 }
577 }
578 }
579 let mut request = client.post(url).multipart(form);
580 if let Some(headers_map) = headers {
581 for (key, value) in headers_map {
582 request = request.header(key, value);
583 }
584 }
585 let response = request.send().await?;
586 if response.status().is_success() {
587 let response_text = response.text().await.unwrap_or_default();
588
589 if keep_media_copy.unwrap_or(false) {
590 for media in &record.recorder {
591 let p = Path::new(&media.path);
592 if p.exists() {
593 tokio::fs::remove_file(p).await.ok();
594 }
595 }
596 }
597 Ok(format!("HTTP upload successful: {}", response_text))
598 } else {
599 Err(anyhow::anyhow!(
600 "HTTP upload failed with status: {} - {}",
601 response.status(),
602 response.text().await.unwrap_or_default()
603 ))
604 }
605 }
606
607 pub async fn save_with_s3_like(
608 formatter: Arc<dyn CallRecordFormatter>,
609 vendor: &S3Vendor,
610 bucket: &String,
611 region: &String,
612 access_key: &String,
613 secret_key: &String,
614 endpoint: &String,
615 with_media: &Option<bool>,
616 keep_media_copy: &Option<bool>,
617 record: &CallRecord,
618 ) -> Result<String> {
619 let start_time = Instant::now();
620 let object_store =
621 build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
622
623 let call_log_json = formatter.format(record)?;
625 let filename = formatter.format_file_name(record);
627 let local_files = vec![filename.clone()];
628 let json_path = ObjectPath::from(filename);
629 let buf_size = call_log_json.len();
630 match object_store
631 .put(&json_path, PutPayload::from(call_log_json))
632 .await
633 {
634 Ok(_) => {
635 info!(
636 elapsed = start_time.elapsed().as_secs_f64(),
637 %json_path,
638 buf_size,
639 "upload call record"
640 );
641 }
642 Err(e) => {
643 warn!(
644 %json_path,
645 "failed to upload call record: {}", e
646 );
647 }
648 }
649 if with_media.unwrap_or(false) {
651 let mut media_files = vec![];
652 for media in &record.recorder {
653 if Path::new(&media.path).exists() {
654 let media_path = ObjectPath::from(formatter.format_media_path(record, media));
655 media_files.push((media.path.clone(), media_path));
656 }
657 }
658 if let Some(dump_events_file) = &record.dump_event_file {
659 if Path::new(&dump_events_file).exists() {
660 let dump_events_path =
661 ObjectPath::from(formatter.format_dump_events_path(record));
662 media_files.push((dump_events_file.clone(), dump_events_path));
663 }
664 }
665 for (path, media_path) in &media_files {
666 let start_time = Instant::now();
667 let file_content = match tokio::fs::read(path).await {
668 Ok(file_content) => file_content,
669 Err(e) => {
670 warn!("failed to read media file {}: {}", path, e);
671 continue;
672 }
673 };
674 let buf_size = file_content.len();
675 match object_store
676 .put(media_path, PutPayload::from(file_content))
677 .await
678 {
679 Ok(_) => {
680 info!(
681 elapsed = start_time.elapsed().as_secs_f64(),
682 %media_path,
683 buf_size,
684 "upload media file"
685 );
686 }
687 Err(e) => {
688 warn!(%media_path,"failed to upload media file: {}", e);
689 }
690 }
691 }
692 }
693 if !keep_media_copy.unwrap_or(false) {
695 for media in &record.recorder {
696 let p = Path::new(&media.path);
697 if p.exists() {
698 tokio::fs::remove_file(p).await.ok();
699 }
700 }
701 for file_name in &local_files {
702 let p = Path::new(file_name);
703 if p.exists() {
704 tokio::fs::remove_file(p).await.ok();
705 }
706 }
707 }
708
709 Ok(format!(
710 "{}/{}",
711 endpoint.trim_end_matches('/'),
712 json_path.to_string().trim_start_matches('/')
713 ))
714 }
715
716 pub async fn serve(&mut self) {
717 let token = self.cancel_token.clone();
718 info!("CallRecordManager serving");
719 tokio::select! {
720 _ = token.cancelled() => {}
721 _ = self.recv_loop() => {}
722 }
723 info!("CallRecordManager served");
724 }
725
726 async fn recv_loop(&mut self) -> Result<()> {
727 let mut futures = FuturesUnordered::new();
728 loop {
729 let limit = self.max_concurrent - futures.len();
730 if limit == 0 {
731 if let Some(_) = futures.next().await {}
732 continue;
733 }
734 let mut buffer = Vec::with_capacity(limit);
735 if self.receiver.recv_many(&mut buffer, limit).await == 0 {
736 break;
737 }
738
739 for record in buffer {
740 let cancel_token_ref = self.cancel_token.clone();
741 let save_fn_ref = self.saver_fn.clone();
742 let config_ref = self.config.clone();
743 let formatter_ref = self.formatter.clone();
744
745 futures.push(async move {
746 if let Err(e) =
747 save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
748 {
749 warn!("Failed to save call record: {}", e);
750 }
751 });
752 }
753 while let Some(_) = futures.next().await {}
754 }
755 Ok(())
756 }
757}