active_call/callrecord/
mod.rs

1use crate::CallOption;
2use crate::{
3    call::ActiveCallType,
4    config::{CallRecordConfig, S3Vendor},
5};
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use futures::stream::{FuturesUnordered, StreamExt};
9use object_store::PutPayload;
10use object_store::{
11    ObjectStore, ObjectStoreExt, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
12    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
13};
14use reqwest;
15use serde::{Deserialize, Serialize};
16use serde_with::skip_serializing_none;
17use std::{
18    collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
19    time::Instant,
20};
21use tokio::{fs::File, io::AsyncWriteExt};
22use tokio_util::sync::CancellationToken;
23use tracing::{info, warn};
24
25pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
26pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
27
28pub type FnSaveCallRecord = Arc<
29    Box<
30        dyn Fn(
31                CancellationToken,
32                Arc<dyn CallRecordFormatter>,
33                Arc<CallRecordConfig>,
34                CallRecord,
35            ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
36            + Send
37            + Sync,
38    >,
39>;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub enum CallRecordEventType {
44    Event,
45    Command,
46    Sip,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CallRecordEvent {
52    pub r#type: CallRecordEventType,
53    pub timestamp: u64,
54    pub content: String,
55}
56
57impl CallRecordEvent {
58    pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
59        let content = match serde_json::to_string(&obj) {
60            Ok(s) => s,
61            Err(_) => return,
62        };
63        let event = Self {
64            r#type,
65            timestamp: crate::media::get_timestamp(),
66            content,
67        };
68        match serde_json::to_string(&event) {
69            Ok(line) => {
70                file.write_all(format!("{}\n", line).as_bytes()).await.ok();
71            }
72            Err(_) => {}
73        }
74    }
75}
76
77#[skip_serializing_none]
78#[derive(Debug, Clone, Serialize, Deserialize, Default)]
79#[serde(rename_all = "camelCase")]
80pub struct CallRecord {
81    pub call_type: ActiveCallType,
82    pub option: Option<CallOption>,
83    pub call_id: String,
84    pub start_time: DateTime<Utc>,
85    pub ring_time: Option<DateTime<Utc>>,
86    pub answer_time: Option<DateTime<Utc>>,
87    pub end_time: DateTime<Utc>,
88    pub caller: String,
89    pub callee: String,
90    pub status_code: u16,
91    pub hangup_reason: Option<CallRecordHangupReason>,
92    #[serde(skip_serializing_if = "Vec::is_empty")]
93    #[serde(default)]
94    pub hangup_messages: Vec<CallRecordHangupMessage>,
95    #[serde(skip_serializing_if = "Vec::is_empty")]
96    #[serde(default)]
97    pub recorder: Vec<CallRecordMedia>,
98    pub extras: Option<HashMap<String, serde_json::Value>>,
99    pub dump_event_file: Option<String>,
100    pub refer_callrecord: Option<Box<CallRecord>>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct CallRecordMedia {
106    pub track_id: String,
107    pub path: String,
108    pub size: u64,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub extra: Option<HashMap<String, serde_json::Value>>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub enum CallRecordHangupReason {
116    ByCaller,
117    ByCallee,
118    ByRefer,
119    BySystem,
120    Autohangup,
121    InactivityTimeout,
122    NoAnswer,
123    NoBalance,
124    AnswerMachine,
125    ServerUnavailable,
126    Canceled,
127    Rejected,
128    Failed,
129    Other(String),
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "camelCase")]
134pub struct CallRecordHangupMessage {
135    pub code: u16,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub reason: Option<String>,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub target: Option<String>,
140}
141
142impl FromStr for CallRecordHangupReason {
143    type Err = anyhow::Error;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        match s.to_lowercase().as_str() {
147            "caller" => Ok(Self::ByCaller),
148            "callee" => Ok(Self::ByCallee),
149            "refer" => Ok(Self::ByRefer),
150            "system" => Ok(Self::BySystem),
151            "autohangup" => Ok(Self::Autohangup),
152            "inactivitytimeout" => Ok(Self::InactivityTimeout),
153            "noAnswer" => Ok(Self::NoAnswer),
154            "noBalance" => Ok(Self::NoBalance),
155            "answerMachine" => Ok(Self::AnswerMachine),
156            "serverUnavailable" => Ok(Self::ServerUnavailable),
157            "canceled" => Ok(Self::Canceled),
158            "rejected" => Ok(Self::Rejected),
159            "failed" => Ok(Self::Failed),
160            _ => Ok(Self::Other(s.to_string())),
161        }
162    }
163}
164impl ToString for CallRecordHangupReason {
165    fn to_string(&self) -> String {
166        match self {
167            Self::ByCaller => "caller".to_string(),
168            Self::ByCallee => "callee".to_string(),
169            Self::ByRefer => "refer".to_string(),
170            Self::BySystem => "system".to_string(),
171            Self::Autohangup => "autohangup".to_string(),
172            Self::InactivityTimeout => "inactivityTimeout".to_string(),
173            Self::NoAnswer => "noAnswer".to_string(),
174            Self::NoBalance => "noBalance".to_string(),
175            Self::AnswerMachine => "answerMachine".to_string(),
176            Self::ServerUnavailable => "serverUnavailable".to_string(),
177            Self::Canceled => "canceled".to_string(),
178            Self::Rejected => "rejected".to_string(),
179            Self::Failed => "failed".to_string(),
180            Self::Other(s) => s.to_string(),
181        }
182    }
183}
184
185pub fn default_cdr_file_name(record: &CallRecord) -> String {
186    format!(
187        "{}_{}.json",
188        record.start_time.format("%Y%m%d-%H%M%S"),
189        record.call_id
190    )
191}
192
193pub trait CallRecordFormatter: Send + Sync {
194    fn format(&self, record: &CallRecord) -> Result<String> {
195        Ok(serde_json::to_string(record)?)
196    }
197    fn format_file_name(&self, record: &CallRecord) -> String;
198    fn format_dump_events_path(&self, record: &CallRecord) -> String;
199    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
200}
201
202pub struct DefaultCallRecordFormatter {
203    pub root: String,
204}
205
206impl Default for DefaultCallRecordFormatter {
207    fn default() -> Self {
208        Self {
209            root: "./config/cdr".to_string(),
210        }
211    }
212}
213
214impl DefaultCallRecordFormatter {
215    pub fn new_with_config(config: &CallRecordConfig) -> Self {
216        let root = match config {
217            CallRecordConfig::Local { root } => root.clone(),
218            CallRecordConfig::S3 { root, .. } => root.clone(),
219            _ => "./config/cdr".to_string(),
220        };
221        Self { root }
222    }
223}
224
225impl CallRecordFormatter for DefaultCallRecordFormatter {
226    fn format_file_name(&self, record: &CallRecord) -> String {
227        let trimmed_root = self.root.trim_end_matches('/');
228        let file_name = default_cdr_file_name(record);
229        if trimmed_root.is_empty() {
230            file_name
231        } else {
232            format!(
233                "{}/{}/{}",
234                trimmed_root,
235                record.start_time.format("%Y%m%d"),
236                file_name
237            )
238        }
239    }
240
241    fn format_dump_events_path(&self, record: &CallRecord) -> String {
242        format!(
243            "{}/{}/{}.jsonl",
244            self.root.trim_end_matches('/'),
245            record.start_time.format("%Y%m%d"),
246            record.call_id
247        )
248    }
249
250    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
251        let file_name = Path::new(&media.path)
252            .file_name()
253            .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
254            .to_string_lossy()
255            .to_string();
256
257        format!(
258            "{}/{}/{}_{}_{}",
259            self.root.trim_end_matches('/'),
260            record.start_time.format("%Y%m%d"),
261            record.call_id,
262            media.track_id,
263            file_name
264        )
265    }
266}
267
268pub fn build_object_store_from_s3(
269    vendor: &S3Vendor,
270    bucket: &str,
271    region: &str,
272    access_key: &str,
273    secret_key: &str,
274    endpoint: &str,
275) -> Result<Arc<dyn ObjectStore>> {
276    let store: Arc<dyn ObjectStore> = match vendor {
277        S3Vendor::AWS => {
278            let builder = AmazonS3Builder::new()
279                .with_bucket_name(bucket)
280                .with_region(region)
281                .with_access_key_id(access_key)
282                .with_secret_access_key(secret_key);
283
284            let instance = if !endpoint.is_empty() {
285                builder.with_endpoint(endpoint).build()?
286            } else {
287                builder.build()?
288            };
289            Arc::new(instance)
290        }
291        S3Vendor::GCP => {
292            let instance = GoogleCloudStorageBuilder::new()
293                .with_bucket_name(bucket)
294                .with_service_account_key(secret_key)
295                .build()?;
296            Arc::new(instance)
297        }
298        S3Vendor::Azure => {
299            let instance = MicrosoftAzureBuilder::new()
300                .with_container_name(bucket)
301                .with_account(access_key)
302                .with_access_key(secret_key)
303                .build()?;
304            Arc::new(instance)
305        }
306        S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
307            let instance = AmazonS3Builder::new()
308                .with_bucket_name(bucket)
309                .with_region(region)
310                .with_access_key_id(access_key)
311                .with_secret_access_key(secret_key)
312                .with_endpoint(endpoint)
313                .with_virtual_hosted_style_request(false)
314                .build()?;
315            Arc::new(instance)
316        }
317    };
318
319    Ok(store)
320}
321
322pub struct CallRecordManager {
323    pub max_concurrent: usize,
324    pub sender: CallRecordSender,
325    config: Arc<CallRecordConfig>,
326    cancel_token: CancellationToken,
327    receiver: CallRecordReceiver,
328    saver_fn: FnSaveCallRecord,
329    formatter: Arc<dyn CallRecordFormatter>,
330}
331
332pub struct CallRecordManagerBuilder {
333    pub cancel_token: Option<CancellationToken>,
334    pub config: Option<CallRecordConfig>,
335    pub max_concurrent: Option<usize>,
336    saver_fn: Option<FnSaveCallRecord>,
337    formatter: Option<Arc<dyn CallRecordFormatter>>,
338}
339
340impl CallRecordManagerBuilder {
341    pub fn new() -> Self {
342        Self {
343            cancel_token: None,
344            config: None,
345            max_concurrent: None,
346            saver_fn: None,
347            formatter: None,
348        }
349    }
350
351    pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
352        self.cancel_token = Some(cancel_token);
353        self
354    }
355
356    pub fn with_config(mut self, config: CallRecordConfig) -> Self {
357        self.config = Some(config);
358        self
359    }
360
361    pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
362        self.saver_fn = Some(saver);
363        self
364    }
365
366    pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
367        self.formatter = Some(formatter);
368        self
369    }
370
371    pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
372        self.max_concurrent = Some(max_concurrent);
373        self
374    }
375
376    pub fn build(self) -> CallRecordManager {
377        let cancel_token = self.cancel_token.unwrap_or_default();
378        let config = Arc::new(self.config.unwrap_or_default());
379        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
380        let saver_fn = self
381            .saver_fn
382            .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
383        let formatter = self
384            .formatter
385            .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
386        let max_concurrent = self.max_concurrent.unwrap_or(64);
387
388        match config.as_ref() {
389            CallRecordConfig::Local { root } => {
390                if !Path::new(&root).exists() {
391                    match std::fs::create_dir_all(&root) {
392                        Ok(_) => {
393                            info!("CallRecordManager created directory: {}", root);
394                        }
395                        Err(e) => {
396                            warn!("CallRecordManager failed to create directory: {}", e);
397                        }
398                    }
399                }
400            }
401            _ => {}
402        }
403
404        CallRecordManager {
405            max_concurrent,
406            cancel_token,
407            sender,
408            receiver,
409            config,
410            saver_fn,
411            formatter,
412        }
413    }
414}
415
416impl CallRecordManager {
417    fn default_saver(
418        _cancel_token: CancellationToken,
419        formatter: Arc<dyn CallRecordFormatter>,
420        config: Arc<CallRecordConfig>,
421        record: CallRecord,
422    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
423        Box::pin(async move {
424            let mut record = record;
425            let start_time = Instant::now();
426            let result = match config.as_ref() {
427                CallRecordConfig::Local { .. } => {
428                    Self::save_local_record(formatter.clone(), &mut record).await
429                }
430                CallRecordConfig::S3 {
431                    vendor,
432                    bucket,
433                    region,
434                    access_key,
435                    secret_key,
436                    endpoint,
437                    with_media,
438                    keep_media_copy,
439                    ..
440                } => {
441                    Self::save_with_s3_like(
442                        formatter.clone(),
443                        vendor,
444                        bucket,
445                        region,
446                        access_key,
447                        secret_key,
448                        endpoint,
449                        with_media,
450                        keep_media_copy,
451                        &record,
452                    )
453                    .await
454                }
455                CallRecordConfig::Http {
456                    url,
457                    headers,
458                    with_media,
459                    keep_media_copy,
460                } => {
461                    Self::save_with_http(
462                        formatter.clone(),
463                        url,
464                        headers,
465                        with_media,
466                        keep_media_copy,
467                        &record,
468                    )
469                    .await
470                }
471            };
472            let file_name = match result {
473                Ok(file_name) => file_name,
474                Err(e) => {
475                    warn!("Failed to save call record: {}", e);
476                    return Err(e);
477                }
478            };
479            let elapsed = start_time.elapsed();
480            info!(
481                ?elapsed,
482                call_id = record.call_id,
483                file_name,
484                "CallRecordManager saved"
485            );
486            Ok(())
487        })
488    }
489
490    async fn save_local_record(
491        formatter: Arc<dyn CallRecordFormatter>,
492        record: &mut CallRecord,
493    ) -> Result<String> {
494        let file_content = formatter.format(record)?;
495        let file_name = formatter.format_file_name(record);
496
497        // Ensure parent directory exists
498        if let Some(parent) = Path::new(&file_name).parent() {
499            tokio::fs::create_dir_all(parent).await.map_err(|e| {
500                anyhow::anyhow!("Failed to create parent directory for {}: {}", file_name, e)
501            })?;
502        }
503
504        let mut file = File::create(&file_name).await.map_err(|e| {
505            anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
506        })?;
507        file.write_all(file_content.as_bytes()).await?;
508        file.flush().await?;
509        Ok(file_name.to_string())
510    }
511
512    pub async fn save_with_http(
513        formatter: Arc<dyn CallRecordFormatter>,
514        url: &String,
515        headers: &Option<HashMap<String, String>>,
516        with_media: &Option<bool>,
517        keep_media_copy: &Option<bool>,
518        record: &CallRecord,
519    ) -> Result<String> {
520        let client = reqwest::Client::new();
521        // Serialize call record to JSON
522        let call_log_json = formatter.format(record)?;
523        // Create multipart form
524        let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
525
526        // Add media files if with_media is true
527        if with_media.unwrap_or(false) {
528            for media in &record.recorder {
529                if std::path::Path::new(&media.path).exists() {
530                    match tokio::fs::read(&media.path).await {
531                        Ok(file_content) => {
532                            let file_name = std::path::Path::new(&media.path)
533                                .file_name()
534                                .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
535                                .to_string_lossy()
536                                .to_string();
537
538                            let part = match reqwest::multipart::Part::bytes(file_content)
539                                .file_name(file_name.clone())
540                                .mime_str("application/octet-stream")
541                            {
542                                Ok(part) => part,
543                                Err(_) => {
544                                    // Fallback to default MIME type if parsing fails
545                                    reqwest::multipart::Part::bytes(
546                                        tokio::fs::read(&media.path).await?,
547                                    )
548                                    .file_name(file_name)
549                                }
550                            };
551
552                            form = form.part(format!("media_{}", media.track_id), part);
553                        }
554                        Err(e) => {
555                            warn!("Failed to read media file {}: {}", media.path, e);
556                        }
557                    }
558                }
559            }
560            if let Some(dump_events_file) = &record.dump_event_file {
561                if Path::new(&dump_events_file).exists() {
562                    let file_name = Path::new(&dump_events_file)
563                        .file_name()
564                        .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
565                        .to_string_lossy()
566                        .to_string();
567                    match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
568                        .file_name(file_name.clone())
569                        .mime_str("application/octet-stream")
570                    {
571                        Ok(part) => {
572                            form = form.part(format!("dump_events_{}", file_name), part);
573                        }
574                        Err(_) => {}
575                    };
576                }
577            }
578        }
579        let mut request = client.post(url).multipart(form);
580        if let Some(headers_map) = headers {
581            for (key, value) in headers_map {
582                request = request.header(key, value);
583            }
584        }
585        let response = request.send().await?;
586        if response.status().is_success() {
587            let response_text = response.text().await.unwrap_or_default();
588
589            if keep_media_copy.unwrap_or(false) {
590                for media in &record.recorder {
591                    let p = Path::new(&media.path);
592                    if p.exists() {
593                        tokio::fs::remove_file(p).await.ok();
594                    }
595                }
596            }
597            Ok(format!("HTTP upload successful: {}", response_text))
598        } else {
599            Err(anyhow::anyhow!(
600                "HTTP upload failed with status: {} - {}",
601                response.status(),
602                response.text().await.unwrap_or_default()
603            ))
604        }
605    }
606
607    pub async fn save_with_s3_like(
608        formatter: Arc<dyn CallRecordFormatter>,
609        vendor: &S3Vendor,
610        bucket: &String,
611        region: &String,
612        access_key: &String,
613        secret_key: &String,
614        endpoint: &String,
615        with_media: &Option<bool>,
616        keep_media_copy: &Option<bool>,
617        record: &CallRecord,
618    ) -> Result<String> {
619        let start_time = Instant::now();
620        let object_store =
621            build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
622
623        // Serialize call record to JSON
624        let call_log_json = formatter.format(record)?;
625        // Upload call log JSON
626        let filename = formatter.format_file_name(record);
627        let local_files = vec![filename.clone()];
628        let json_path = ObjectPath::from(filename);
629        let buf_size = call_log_json.len();
630        match object_store
631            .put(&json_path, PutPayload::from(call_log_json))
632            .await
633        {
634            Ok(_) => {
635                info!(
636                    elapsed = start_time.elapsed().as_secs_f64(),
637                    %json_path,
638                    buf_size,
639                    "upload call record"
640                );
641            }
642            Err(e) => {
643                warn!(
644                   %json_path,
645                    "failed to upload call record: {}", e
646                );
647            }
648        }
649        // Upload media files if with_media is true
650        if with_media.unwrap_or(false) {
651            let mut media_files = vec![];
652            for media in &record.recorder {
653                if Path::new(&media.path).exists() {
654                    let media_path = ObjectPath::from(formatter.format_media_path(record, media));
655                    media_files.push((media.path.clone(), media_path));
656                }
657            }
658            if let Some(dump_events_file) = &record.dump_event_file {
659                if Path::new(&dump_events_file).exists() {
660                    let dump_events_path =
661                        ObjectPath::from(formatter.format_dump_events_path(record));
662                    media_files.push((dump_events_file.clone(), dump_events_path));
663                }
664            }
665            for (path, media_path) in &media_files {
666                let start_time = Instant::now();
667                let file_content = match tokio::fs::read(path).await {
668                    Ok(file_content) => file_content,
669                    Err(e) => {
670                        warn!("failed to read media file {}: {}", path, e);
671                        continue;
672                    }
673                };
674                let buf_size = file_content.len();
675                match object_store
676                    .put(media_path, PutPayload::from(file_content))
677                    .await
678                {
679                    Ok(_) => {
680                        info!(
681                            elapsed = start_time.elapsed().as_secs_f64(),
682                            %media_path,
683                            buf_size,
684                            "upload media file"
685                        );
686                    }
687                    Err(e) => {
688                        warn!(%media_path,"failed to upload media file: {}", e);
689                    }
690                }
691            }
692        }
693        // Optionally delete local media files if keep_media_copy is false
694        if !keep_media_copy.unwrap_or(false) {
695            for media in &record.recorder {
696                let p = Path::new(&media.path);
697                if p.exists() {
698                    tokio::fs::remove_file(p).await.ok();
699                }
700            }
701            for file_name in &local_files {
702                let p = Path::new(file_name);
703                if p.exists() {
704                    tokio::fs::remove_file(p).await.ok();
705                }
706            }
707        }
708
709        Ok(format!(
710            "{}/{}",
711            endpoint.trim_end_matches('/'),
712            json_path.to_string().trim_start_matches('/')
713        ))
714    }
715
716    pub async fn serve(&mut self) {
717        let token = self.cancel_token.clone();
718        info!("CallRecordManager serving");
719        tokio::select! {
720            _ = token.cancelled() => {}
721            _ = self.recv_loop() => {}
722        }
723        info!("CallRecordManager served");
724    }
725
726    async fn recv_loop(&mut self) -> Result<()> {
727        let mut futures = FuturesUnordered::new();
728        loop {
729            let limit = self.max_concurrent - futures.len();
730            if limit == 0 {
731                if let Some(_) = futures.next().await {}
732                continue;
733            }
734            let mut buffer = Vec::with_capacity(limit);
735            if self.receiver.recv_many(&mut buffer, limit).await == 0 {
736                break;
737            }
738
739            for record in buffer {
740                let cancel_token_ref = self.cancel_token.clone();
741                let save_fn_ref = self.saver_fn.clone();
742                let config_ref = self.config.clone();
743                let formatter_ref = self.formatter.clone();
744
745                futures.push(async move {
746                    if let Err(e) =
747                        save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
748                    {
749                        warn!("Failed to save call record: {}", e);
750                    }
751                });
752            }
753            while let Some(_) = futures.next().await {}
754        }
755        Ok(())
756    }
757}