active_call/callrecord/
mod.rs

1use crate::{
2    call::ActiveCallType,
3    config::{CallRecordConfig, S3Vendor},
4};
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use futures::stream::{FuturesUnordered, StreamExt};
8use object_store::{
9    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
10    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
11};
12use reqwest;
13use serde::{Deserialize, Serialize};
14use serde_with::skip_serializing_none;
15use std::{
16    collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
17    time::Instant,
18};
19use tokio::{fs::File, io::AsyncWriteExt};
20use tokio_util::sync::CancellationToken;
21use tracing::{info, warn};
22use voice_engine::CallOption;
23
24pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
25pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
26
27pub type FnSaveCallRecord = Arc<
28    Box<
29        dyn Fn(
30                CancellationToken,
31                Arc<dyn CallRecordFormatter>,
32                Arc<CallRecordConfig>,
33                CallRecord,
34            ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
35            + Send
36            + Sync,
37    >,
38>;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub enum CallRecordEventType {
43    Event,
44    Command,
45    Sip,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct CallRecordEvent {
51    pub r#type: CallRecordEventType,
52    pub timestamp: u64,
53    pub content: String,
54}
55
56impl CallRecordEvent {
57    pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
58        let content = match serde_json::to_string(&obj) {
59            Ok(s) => s,
60            Err(_) => return,
61        };
62        let event = Self {
63            r#type,
64            timestamp: voice_engine::media::get_timestamp(),
65            content,
66        };
67        match serde_json::to_string(&event) {
68            Ok(line) => {
69                file.write_all(format!("{}\n", line).as_bytes()).await.ok();
70            }
71            Err(_) => {}
72        }
73    }
74}
75
76#[skip_serializing_none]
77#[derive(Debug, Clone, Serialize, Deserialize, Default)]
78#[serde(rename_all = "camelCase")]
79pub struct CallRecord {
80    pub call_type: ActiveCallType,
81    pub option: Option<CallOption>,
82    pub call_id: String,
83    pub start_time: DateTime<Utc>,
84    pub ring_time: Option<DateTime<Utc>>,
85    pub answer_time: Option<DateTime<Utc>>,
86    pub end_time: DateTime<Utc>,
87    pub caller: String,
88    pub callee: String,
89    pub status_code: u16,
90    pub hangup_reason: Option<CallRecordHangupReason>,
91    #[serde(skip_serializing_if = "Vec::is_empty")]
92    #[serde(default)]
93    pub hangup_messages: Vec<CallRecordHangupMessage>,
94    #[serde(skip_serializing_if = "Vec::is_empty")]
95    #[serde(default)]
96    pub recorder: Vec<CallRecordMedia>,
97    pub extras: Option<HashMap<String, serde_json::Value>>,
98    pub dump_event_file: Option<String>,
99    pub refer_callrecord: Option<Box<CallRecord>>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct CallRecordMedia {
105    pub track_id: String,
106    pub path: String,
107    pub size: u64,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub extra: Option<HashMap<String, serde_json::Value>>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub enum CallRecordHangupReason {
115    ByCaller,
116    ByCallee,
117    ByRefer,
118    BySystem,
119    Autohangup,
120    NoAnswer,
121    NoBalance,
122    AnswerMachine,
123    ServerUnavailable,
124    Canceled,
125    Rejected,
126    Failed,
127    Other(String),
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct CallRecordHangupMessage {
133    pub code: u16,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub reason: Option<String>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub target: Option<String>,
138}
139
140impl FromStr for CallRecordHangupReason {
141    type Err = anyhow::Error;
142
143    fn from_str(s: &str) -> Result<Self, Self::Err> {
144        match s.to_lowercase().as_str() {
145            "caller" => Ok(Self::ByCaller),
146            "callee" => Ok(Self::ByCallee),
147            "refer" => Ok(Self::ByRefer),
148            "system" => Ok(Self::BySystem),
149            "autohangup" => Ok(Self::Autohangup),
150            "noAnswer" => Ok(Self::NoAnswer),
151            "noBalance" => Ok(Self::NoBalance),
152            "answerMachine" => Ok(Self::AnswerMachine),
153            "serverUnavailable" => Ok(Self::ServerUnavailable),
154            "canceled" => Ok(Self::Canceled),
155            "rejected" => Ok(Self::Rejected),
156            "failed" => Ok(Self::Failed),
157            _ => Ok(Self::Other(s.to_string())),
158        }
159    }
160}
161impl ToString for CallRecordHangupReason {
162    fn to_string(&self) -> String {
163        match self {
164            Self::ByCaller => "caller".to_string(),
165            Self::ByCallee => "callee".to_string(),
166            Self::ByRefer => "refer".to_string(),
167            Self::BySystem => "system".to_string(),
168            Self::Autohangup => "autohangup".to_string(),
169            Self::NoAnswer => "noAnswer".to_string(),
170            Self::NoBalance => "noBalance".to_string(),
171            Self::AnswerMachine => "answerMachine".to_string(),
172            Self::ServerUnavailable => "serverUnavailable".to_string(),
173            Self::Canceled => "canceled".to_string(),
174            Self::Rejected => "rejected".to_string(),
175            Self::Failed => "failed".to_string(),
176            Self::Other(s) => s.to_string(),
177        }
178    }
179}
180
181pub fn default_cdr_file_name(record: &CallRecord) -> String {
182    format!(
183        "{}_{}.json",
184        record.start_time.format("%Y%m%d-%H%M%S"),
185        record.call_id
186    )
187}
188
189pub trait CallRecordFormatter: Send + Sync {
190    fn format(&self, record: &CallRecord) -> Result<String> {
191        Ok(serde_json::to_string(record)?)
192    }
193    fn format_file_name(&self, record: &CallRecord) -> String;
194    fn format_dump_events_path(&self, record: &CallRecord) -> String;
195    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
196}
197
198pub struct DefaultCallRecordFormatter {
199    pub root: String,
200}
201
202impl Default for DefaultCallRecordFormatter {
203    fn default() -> Self {
204        Self {
205            root: "./config/cdr".to_string(),
206        }
207    }
208}
209
210impl DefaultCallRecordFormatter {
211    pub fn new_with_config(config: &CallRecordConfig) -> Self {
212        let root = match config {
213            CallRecordConfig::Local { root } => root.clone(),
214            CallRecordConfig::S3 { root, .. } => root.clone(),
215            _ => "./config/cdr".to_string(),
216        };
217        Self { root }
218    }
219}
220
221impl CallRecordFormatter for DefaultCallRecordFormatter {
222    fn format_file_name(&self, record: &CallRecord) -> String {
223        let trimmed_root = self.root.trim_end_matches('/');
224        let file_name = default_cdr_file_name(record);
225        if trimmed_root.is_empty() {
226            file_name
227        } else {
228            format!(
229                "{}/{}/{}",
230                trimmed_root,
231                record.start_time.format("%Y%m%d"),
232                file_name
233            )
234        }
235    }
236
237    fn format_dump_events_path(&self, record: &CallRecord) -> String {
238        format!(
239            "{}/{}/{}.jsonl",
240            self.root.trim_end_matches('/'),
241            record.start_time.format("%Y%m%d"),
242            record.call_id
243        )
244    }
245
246    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
247        let file_name = Path::new(&media.path)
248            .file_name()
249            .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
250            .to_string_lossy()
251            .to_string();
252
253        format!(
254            "{}/{}/{}_{}_{}",
255            self.root.trim_end_matches('/'),
256            record.start_time.format("%Y%m%d"),
257            record.call_id,
258            media.track_id,
259            file_name
260        )
261    }
262}
263
264pub fn build_object_store_from_s3(
265    vendor: &S3Vendor,
266    bucket: &str,
267    region: &str,
268    access_key: &str,
269    secret_key: &str,
270    endpoint: &str,
271) -> Result<Arc<dyn ObjectStore>> {
272    let store: Arc<dyn ObjectStore> = match vendor {
273        S3Vendor::AWS => {
274            let builder = AmazonS3Builder::new()
275                .with_bucket_name(bucket)
276                .with_region(region)
277                .with_access_key_id(access_key)
278                .with_secret_access_key(secret_key);
279
280            let instance = if !endpoint.is_empty() {
281                builder.with_endpoint(endpoint).build()?
282            } else {
283                builder.build()?
284            };
285            Arc::new(instance)
286        }
287        S3Vendor::GCP => {
288            let instance = GoogleCloudStorageBuilder::new()
289                .with_bucket_name(bucket)
290                .with_service_account_key(secret_key)
291                .build()?;
292            Arc::new(instance)
293        }
294        S3Vendor::Azure => {
295            let instance = MicrosoftAzureBuilder::new()
296                .with_container_name(bucket)
297                .with_account(access_key)
298                .with_access_key(secret_key)
299                .build()?;
300            Arc::new(instance)
301        }
302        S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
303            let instance = AmazonS3Builder::new()
304                .with_bucket_name(bucket)
305                .with_region(region)
306                .with_access_key_id(access_key)
307                .with_secret_access_key(secret_key)
308                .with_endpoint(endpoint)
309                .with_virtual_hosted_style_request(false)
310                .build()?;
311            Arc::new(instance)
312        }
313    };
314
315    Ok(store)
316}
317
318pub struct CallRecordManager {
319    pub max_concurrent: usize,
320    pub sender: CallRecordSender,
321    config: Arc<CallRecordConfig>,
322    cancel_token: CancellationToken,
323    receiver: CallRecordReceiver,
324    saver_fn: FnSaveCallRecord,
325    formatter: Arc<dyn CallRecordFormatter>,
326}
327
328pub struct CallRecordManagerBuilder {
329    pub cancel_token: Option<CancellationToken>,
330    pub config: Option<CallRecordConfig>,
331    pub max_concurrent: Option<usize>,
332    saver_fn: Option<FnSaveCallRecord>,
333    formatter: Option<Arc<dyn CallRecordFormatter>>,
334}
335
336impl CallRecordManagerBuilder {
337    pub fn new() -> Self {
338        Self {
339            cancel_token: None,
340            config: None,
341            max_concurrent: None,
342            saver_fn: None,
343            formatter: None,
344        }
345    }
346
347    pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
348        self.cancel_token = Some(cancel_token);
349        self
350    }
351
352    pub fn with_config(mut self, config: CallRecordConfig) -> Self {
353        self.config = Some(config);
354        self
355    }
356
357    pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
358        self.saver_fn = Some(saver);
359        self
360    }
361
362    pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
363        self.formatter = Some(formatter);
364        self
365    }
366
367    pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
368        self.max_concurrent = Some(max_concurrent);
369        self
370    }
371
372    pub fn build(self) -> CallRecordManager {
373        let cancel_token = self.cancel_token.unwrap_or_default();
374        let config = Arc::new(self.config.unwrap_or_default());
375        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
376        let saver_fn = self
377            .saver_fn
378            .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
379        let formatter = self
380            .formatter
381            .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
382        let max_concurrent = self.max_concurrent.unwrap_or(64);
383
384        match config.as_ref() {
385            CallRecordConfig::Local { root } => {
386                if !Path::new(&root).exists() {
387                    match std::fs::create_dir_all(&root) {
388                        Ok(_) => {
389                            info!("CallRecordManager created directory: {}", root);
390                        }
391                        Err(e) => {
392                            warn!("CallRecordManager failed to create directory: {}", e);
393                        }
394                    }
395                }
396            }
397            _ => {}
398        }
399
400        CallRecordManager {
401            max_concurrent,
402            cancel_token,
403            sender,
404            receiver,
405            config,
406            saver_fn,
407            formatter,
408        }
409    }
410}
411
412impl CallRecordManager {
413    fn default_saver(
414        _cancel_token: CancellationToken,
415        formatter: Arc<dyn CallRecordFormatter>,
416        config: Arc<CallRecordConfig>,
417        record: CallRecord,
418    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
419        Box::pin(async move {
420            let mut record = record;
421            let start_time = Instant::now();
422            let result = match config.as_ref() {
423                CallRecordConfig::Local { .. } => {
424                    Self::save_local_record(formatter.clone(), &mut record).await
425                }
426                CallRecordConfig::S3 {
427                    vendor,
428                    bucket,
429                    region,
430                    access_key,
431                    secret_key,
432                    endpoint,
433                    with_media,
434                    keep_media_copy,
435                    ..
436                } => {
437                    Self::save_with_s3_like(
438                        formatter.clone(),
439                        vendor,
440                        bucket,
441                        region,
442                        access_key,
443                        secret_key,
444                        endpoint,
445                        with_media,
446                        keep_media_copy,
447                        &record,
448                    )
449                    .await
450                }
451                CallRecordConfig::Http {
452                    url,
453                    headers,
454                    with_media,
455                    keep_media_copy,
456                } => {
457                    Self::save_with_http(
458                        formatter.clone(),
459                        url,
460                        headers,
461                        with_media,
462                        keep_media_copy,
463                        &record,
464                    )
465                    .await
466                }
467            };
468            let file_name = match result {
469                Ok(file_name) => file_name,
470                Err(e) => {
471                    warn!("Failed to save call record: {}", e);
472                    return Err(e);
473                }
474            };
475            let elapsed = start_time.elapsed();
476            info!(
477                ?elapsed,
478                call_id = record.call_id,
479                file_name,
480                "CallRecordManager saved"
481            );
482            Ok(())
483        })
484    }
485
486    async fn save_local_record(
487        formatter: Arc<dyn CallRecordFormatter>,
488        record: &mut CallRecord,
489    ) -> Result<String> {
490        let file_content = formatter.format(record)?;
491        let file_name = formatter.format_file_name(record);
492
493        // Ensure parent directory exists
494        if let Some(parent) = Path::new(&file_name).parent() {
495            tokio::fs::create_dir_all(parent).await.map_err(|e| {
496                anyhow::anyhow!("Failed to create parent directory for {}: {}", file_name, e)
497            })?;
498        }
499
500        let mut file = File::create(&file_name).await.map_err(|e| {
501            anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
502        })?;
503        file.write_all(file_content.as_bytes()).await?;
504        file.flush().await?;
505        Ok(file_name.to_string())
506    }
507
508    pub async fn save_with_http(
509        formatter: Arc<dyn CallRecordFormatter>,
510        url: &String,
511        headers: &Option<HashMap<String, String>>,
512        with_media: &Option<bool>,
513        keep_media_copy: &Option<bool>,
514        record: &CallRecord,
515    ) -> Result<String> {
516        let client = reqwest::Client::new();
517        // Serialize call record to JSON
518        let call_log_json = formatter.format(record)?;
519        // Create multipart form
520        let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
521
522        // Add media files if with_media is true
523        if with_media.unwrap_or(false) {
524            for media in &record.recorder {
525                if std::path::Path::new(&media.path).exists() {
526                    match tokio::fs::read(&media.path).await {
527                        Ok(file_content) => {
528                            let file_name = std::path::Path::new(&media.path)
529                                .file_name()
530                                .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
531                                .to_string_lossy()
532                                .to_string();
533
534                            let part = match reqwest::multipart::Part::bytes(file_content)
535                                .file_name(file_name.clone())
536                                .mime_str("application/octet-stream")
537                            {
538                                Ok(part) => part,
539                                Err(_) => {
540                                    // Fallback to default MIME type if parsing fails
541                                    reqwest::multipart::Part::bytes(
542                                        tokio::fs::read(&media.path).await?,
543                                    )
544                                    .file_name(file_name)
545                                }
546                            };
547
548                            form = form.part(format!("media_{}", media.track_id), part);
549                        }
550                        Err(e) => {
551                            warn!("Failed to read media file {}: {}", media.path, e);
552                        }
553                    }
554                }
555            }
556            if let Some(dump_events_file) = &record.dump_event_file {
557                if Path::new(&dump_events_file).exists() {
558                    let file_name = Path::new(&dump_events_file)
559                        .file_name()
560                        .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
561                        .to_string_lossy()
562                        .to_string();
563                    match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
564                        .file_name(file_name.clone())
565                        .mime_str("application/octet-stream")
566                    {
567                        Ok(part) => {
568                            form = form.part(format!("dump_events_{}", file_name), part);
569                        }
570                        Err(_) => {}
571                    };
572                }
573            }
574        }
575        let mut request = client.post(url).multipart(form);
576        if let Some(headers_map) = headers {
577            for (key, value) in headers_map {
578                request = request.header(key, value);
579            }
580        }
581        let response = request.send().await?;
582        if response.status().is_success() {
583            let response_text = response.text().await.unwrap_or_default();
584
585            if keep_media_copy.unwrap_or(false) {
586                for media in &record.recorder {
587                    let p = Path::new(&media.path);
588                    if p.exists() {
589                        tokio::fs::remove_file(p).await.ok();
590                    }
591                }
592            }
593            Ok(format!("HTTP upload successful: {}", response_text))
594        } else {
595            Err(anyhow::anyhow!(
596                "HTTP upload failed with status: {} - {}",
597                response.status(),
598                response.text().await.unwrap_or_default()
599            ))
600        }
601    }
602
603    pub async fn save_with_s3_like(
604        formatter: Arc<dyn CallRecordFormatter>,
605        vendor: &S3Vendor,
606        bucket: &String,
607        region: &String,
608        access_key: &String,
609        secret_key: &String,
610        endpoint: &String,
611        with_media: &Option<bool>,
612        keep_media_copy: &Option<bool>,
613        record: &CallRecord,
614    ) -> Result<String> {
615        let start_time = Instant::now();
616        let object_store =
617            build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
618
619        // Serialize call record to JSON
620        let call_log_json = formatter.format(record)?;
621        // Upload call log JSON
622        let filename = formatter.format_file_name(record);
623        let local_files = vec![filename.clone()];
624        let json_path = ObjectPath::from(filename);
625        let buf_size = call_log_json.len();
626        match object_store.put(&json_path, call_log_json.into()).await {
627            Ok(_) => {
628                info!(
629                    elapsed = start_time.elapsed().as_secs_f64(),
630                    %json_path,
631                    buf_size,
632                    "upload call record"
633                );
634            }
635            Err(e) => {
636                warn!(
637                   %json_path,
638                    "failed to upload call record: {}", e
639                );
640            }
641        }
642        // Upload media files if with_media is true
643        if with_media.unwrap_or(false) {
644            let mut media_files = vec![];
645            for media in &record.recorder {
646                if Path::new(&media.path).exists() {
647                    let media_path = ObjectPath::from(formatter.format_media_path(record, media));
648                    media_files.push((media.path.clone(), media_path));
649                }
650            }
651            if let Some(dump_events_file) = &record.dump_event_file {
652                if Path::new(&dump_events_file).exists() {
653                    let dump_events_path =
654                        ObjectPath::from(formatter.format_dump_events_path(record));
655                    media_files.push((dump_events_file.clone(), dump_events_path));
656                }
657            }
658            for (path, media_path) in &media_files {
659                let start_time = Instant::now();
660                let file_content = match tokio::fs::read(path).await {
661                    Ok(file_content) => file_content,
662                    Err(e) => {
663                        warn!("failed to read media file {}: {}", path, e);
664                        continue;
665                    }
666                };
667                let buf_size = file_content.len();
668                match object_store.put(media_path, file_content.into()).await {
669                    Ok(_) => {
670                        info!(
671                            elapsed = start_time.elapsed().as_secs_f64(),
672                            %media_path,
673                            buf_size,
674                            "upload media file"
675                        );
676                    }
677                    Err(e) => {
678                        warn!(%media_path,"failed to upload media file: {}", e);
679                    }
680                }
681            }
682        }
683        // Optionally delete local media files if keep_media_copy is false
684        if !keep_media_copy.unwrap_or(false) {
685            for media in &record.recorder {
686                let p = Path::new(&media.path);
687                if p.exists() {
688                    tokio::fs::remove_file(p).await.ok();
689                }
690            }
691            for file_name in &local_files {
692                let p = Path::new(file_name);
693                if p.exists() {
694                    tokio::fs::remove_file(p).await.ok();
695                }
696            }
697        }
698
699        Ok(format!(
700            "{}/{}",
701            endpoint.trim_end_matches('/'),
702            json_path.to_string().trim_start_matches('/')
703        ))
704    }
705
706    pub async fn serve(&mut self) {
707        let token = self.cancel_token.clone();
708        info!("CallRecordManager serving");
709        tokio::select! {
710            _ = token.cancelled() => {}
711            _ = self.recv_loop() => {}
712        }
713        info!("CallRecordManager served");
714    }
715
716    async fn recv_loop(&mut self) -> Result<()> {
717        let mut futures = FuturesUnordered::new();
718        loop {
719            let limit = self.max_concurrent - futures.len();
720            if limit == 0 {
721                if let Some(_) = futures.next().await {}
722                continue;
723            }
724            let mut buffer = Vec::with_capacity(limit);
725            if self.receiver.recv_many(&mut buffer, limit).await == 0 {
726                break;
727            }
728
729            for record in buffer {
730                let cancel_token_ref = self.cancel_token.clone();
731                let save_fn_ref = self.saver_fn.clone();
732                let config_ref = self.config.clone();
733                let formatter_ref = self.formatter.clone();
734
735                futures.push(async move {
736                    if let Err(e) =
737                        save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
738                    {
739                        warn!("Failed to save call record: {}", e);
740                    }
741                });
742            }
743            while let Some(_) = futures.next().await {}
744        }
745        Ok(())
746    }
747}