1use chrono::{DateTime, Utc};
43use futures::StreamExt;
44use serde::{Deserialize, Serialize};
45use std::path::{Path, PathBuf};
46use std::time::{Duration, Instant};
47use thiserror::Error;
48use url::Url;
49
50use super::codec::{parse_stream_info, AudioCodec, CodecType, StreamInfo};
51use super::transport::{
52 establish_session, RtspSession, RtspTransport, SessionConfig, TransportError,
53};
54use crate::camera::Camera;
55
56#[derive(Debug, Error)]
58pub enum RtspError {
59 #[error("Connection failed: {0}")]
61 ConnectionFailed(String),
62
63 #[error("Authentication failed: invalid credentials")]
65 AuthError,
66
67 #[error("Operation timed out after {0:?}")]
69 Timeout(Duration),
70
71 #[error("Codec error: {0}")]
73 CodecError(String),
74
75 #[error("I/O error: {0}")]
77 IoError(#[from] std::io::Error),
78
79 #[error("Stream ended unexpectedly")]
81 StreamEnded,
82
83 #[error("Invalid RTSP URL: {0}")]
85 InvalidUrl(String),
86
87 #[error("Not connected - call connect() first")]
89 NotConnected,
90
91 #[error("Transport error: {0}")]
93 Transport(#[from] TransportError),
94
95 #[error("Failed to capture frame: {0}")]
97 FrameCapture(String),
98
99 #[error("Image encoding error: {0}")]
101 ImageEncoding(String),
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ClipOptions {
107 pub include_audio: bool,
109
110 pub audio_codec_override: Option<AudioCodec>,
112
113 pub container_format: ContainerFormat,
115
116 pub max_file_size: u64,
118}
119
120impl Default for ClipOptions {
121 fn default() -> Self {
122 Self {
123 include_audio: true,
124 audio_codec_override: None,
125 container_format: ContainerFormat::Mp4,
126 max_file_size: 0,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(rename_all = "lowercase")]
134pub enum ContainerFormat {
135 Mp4,
137 Mkv,
139}
140
141impl ContainerFormat {
142 pub fn extension(&self) -> &'static str {
144 match self {
145 Self::Mp4 => "mp4",
146 Self::Mkv => "mkv",
147 }
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct SnapResult {
154 pub path: PathBuf,
156
157 pub size_bytes: u64,
159
160 pub codec: CodecType,
162
163 pub width: u32,
165
166 pub height: u32,
168
169 pub timestamp: DateTime<Utc>,
171}
172
173#[derive(Debug, Clone)]
175pub struct RawFrame {
176 pub rgb: Vec<u8>,
178
179 pub width: u32,
181
182 pub height: u32,
184
185 pub codec: CodecType,
187
188 pub timestamp: DateTime<Utc>,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ClipResult {
195 pub path: PathBuf,
197
198 pub size_bytes: u64,
200
201 pub duration: Duration,
203
204 pub video_codec: CodecType,
206
207 pub audio_codec: Option<AudioCodec>,
209
210 pub timestamp: DateTime<Utc>,
212}
213
214pub struct RtspClient {
216 camera: Camera,
218
219 session: Option<RtspSession>,
221
222 stream_info: Option<StreamInfo>,
224
225 config: SessionConfig,
227}
228
229impl RtspClient {
230 pub fn new(camera: &Camera) -> Result<Self, RtspError> {
244 let url_str = camera.rtsp_url();
246 Url::parse(&url_str).map_err(|e| RtspError::InvalidUrl(format!("{e}: {url_str}")))?;
247
248 let transport = match camera.transport {
250 crate::camera::Transport::Tcp => RtspTransport::Tcp,
251 crate::camera::Transport::Udp => RtspTransport::Udp,
252 };
253
254 let config = SessionConfig::new()
255 .with_transport(transport)
256 .with_timeout(camera.timeout);
257
258 Ok(Self {
259 camera: camera.clone(),
260 session: None,
261 stream_info: None,
262 config,
263 })
264 }
265
266 pub async fn connect(&mut self) -> Result<(), RtspError> {
275 let url_str = self.camera.rtsp_url();
276 let url =
277 Url::parse(&url_str).map_err(|e| RtspError::InvalidUrl(format!("{e}: {url_str}")))?;
278
279 tracing::info!(
280 camera = %self.camera.name,
281 url = %self.camera.rtsp_url_redacted(),
282 "Connecting to RTSP stream"
283 );
284
285 let session = establish_session(&url, self.config.clone()).await?;
287
288 let stream_info = parse_stream_info(session.inner());
292
293 tracing::info!(
294 camera = %self.camera.name,
295 stream_info = %stream_info.description(),
296 "RTSP session established"
297 );
298
299 self.stream_info = Some(stream_info);
300 self.session = Some(session);
301
302 Ok(())
303 }
304
305 pub async fn snap(&mut self, output: &Path) -> Result<SnapResult, RtspError> {
326 let session = self.session.take().ok_or(RtspError::NotConnected)?;
327
328 let stream_info = self.stream_info.clone().ok_or(RtspError::NotConnected)?;
329
330 tracing::info!(
331 camera = %self.camera.name,
332 output = %output.display(),
333 "Capturing snapshot"
334 );
335
336 let mut demuxed = session
338 .session
339 .demuxed()
340 .map_err(|e| RtspError::FrameCapture(format!("Failed to start demuxing: {e}")))?;
341
342 let timeout = tokio::time::timeout(self.config.timeout, async {
344 while let Some(item) = demuxed.next().await {
345 let item =
346 item.map_err(|e| RtspError::FrameCapture(format!("Stream error: {e}")))?;
347 match item {
348 retina::codec::CodecItem::VideoFrame(frame) => {
349 if !frame.is_random_access_point() {
350 tracing::debug!("Skipping non-keyframe");
351 continue;
352 }
353
354 tracing::debug!(data_len = frame.data().len(), "Captured keyframe");
355
356 let data = frame.data();
357
358 match stream_info.video_codec {
360 CodecType::H264 => {
361 let (rgb, width, height) = decode_h264_frame(data)?;
362 let size = encode_image_to_file(&rgb, width, height, output)?;
363 return Ok(SnapResult {
364 path: output.to_path_buf(),
365 size_bytes: size,
366 codec: CodecType::H264,
367 width,
368 height,
369 timestamp: Utc::now(),
370 });
371 }
372 CodecType::Mjpeg => {
373 tokio::fs::write(output, data).await?;
375 let size = data.len() as u64;
376 return Ok(SnapResult {
377 path: output.to_path_buf(),
378 size_bytes: size,
379 codec: CodecType::Mjpeg,
380 width: stream_info.width.unwrap_or(0),
381 height: stream_info.height.unwrap_or(0),
382 timestamp: Utc::now(),
383 });
384 }
385 other => {
386 return Err(RtspError::CodecError(format!(
387 "Snapshot not supported for codec: {other}"
388 )));
389 }
390 }
391 }
392 _ => continue,
393 }
394 }
395 Err(RtspError::StreamEnded)
396 });
397
398 match timeout.await {
399 Ok(result) => result,
400 Err(_) => Err(RtspError::Timeout(self.config.timeout)),
401 }
402 }
403
404 pub async fn capture_raw_frame(&mut self) -> Result<RawFrame, RtspError> {
411 let session = self.session.take().ok_or(RtspError::NotConnected)?;
412
413 let stream_info = self.stream_info.clone().ok_or(RtspError::NotConnected)?;
414
415 let mut demuxed = session
416 .session
417 .demuxed()
418 .map_err(|e| RtspError::FrameCapture(format!("Failed to start demuxing: {e}")))?;
419
420 let timeout = tokio::time::timeout(self.config.timeout, async {
421 while let Some(item) = demuxed.next().await {
422 let item =
423 item.map_err(|e| RtspError::FrameCapture(format!("Stream error: {e}")))?;
424 match item {
425 retina::codec::CodecItem::VideoFrame(frame) => {
426 if !frame.is_random_access_point() {
427 continue;
428 }
429
430 let data = frame.data();
431
432 match stream_info.video_codec {
433 CodecType::H264 => {
434 let (rgb, width, height) = decode_h264_frame(data)?;
435 return Ok(RawFrame {
436 rgb,
437 width,
438 height,
439 codec: CodecType::H264,
440 timestamp: Utc::now(),
441 });
442 }
443 CodecType::Mjpeg => {
444 let img = image::load_from_memory(data).map_err(|e| {
446 RtspError::FrameCapture(format!("MJPEG decode error: {e}"))
447 })?;
448 let rgb_img = img.to_rgb8();
449 let width = rgb_img.width();
450 let height = rgb_img.height();
451 return Ok(RawFrame {
452 rgb: rgb_img.into_raw(),
453 width,
454 height,
455 codec: CodecType::Mjpeg,
456 timestamp: Utc::now(),
457 });
458 }
459 other => {
460 return Err(RtspError::CodecError(format!(
461 "Raw frame capture not supported for codec: {other}"
462 )));
463 }
464 }
465 }
466 _ => continue,
467 }
468 }
469 Err(RtspError::StreamEnded)
470 });
471
472 match timeout.await {
473 Ok(result) => result,
474 Err(_) => Err(RtspError::Timeout(self.config.timeout)),
475 }
476 }
477
478 pub async fn clip(
497 &mut self,
498 output: &Path,
499 duration: Duration,
500 options: ClipOptions,
501 ) -> Result<ClipResult, RtspError> {
502 let session = self.session.take().ok_or(RtspError::NotConnected)?;
503
504 let stream_info = self.stream_info.clone().ok_or(RtspError::NotConnected)?;
505
506 tracing::info!(
507 camera = %self.camera.name,
508 output = %output.display(),
509 duration_secs = duration.as_secs(),
510 include_audio = options.include_audio,
511 container = ?options.container_format,
512 "Recording clip"
513 );
514
515 let mut demuxed = session
517 .session
518 .demuxed()
519 .map_err(|e| RtspError::FrameCapture(format!("Failed to start demuxing: {e}")))?;
520
521 let mut file = tokio::fs::File::create(output).await?;
523 let start = Instant::now();
524 let timestamp = Utc::now();
525 let mut total_bytes: u64 = 0;
526 let mut got_keyframe = false;
527
528 let start_code: &[u8] = &[0x00, 0x00, 0x00, 0x01];
530
531 while start.elapsed() < duration {
532 let next = tokio::time::timeout(Duration::from_secs(5), demuxed.next()).await;
533
534 let item = match next {
535 Ok(Some(Ok(item))) => item,
536 Ok(Some(Err(e))) => {
537 tracing::warn!("Stream error during clip: {}", e);
538 break;
539 }
540 Ok(None) => {
541 tracing::info!("Stream ended during clip recording");
542 break;
543 }
544 Err(_) => {
545 tracing::warn!("Timeout waiting for next frame");
546 break;
547 }
548 };
549
550 match item {
551 retina::codec::CodecItem::VideoFrame(frame) => {
552 if !got_keyframe {
554 if !frame.is_random_access_point() {
555 continue;
556 }
557 got_keyframe = true;
558 }
559
560 let data = frame.data();
561
562 let mut pos = 0;
566 while pos + 4 <= data.len() {
567 let nalu_len = u32::from_be_bytes([
568 data[pos],
569 data[pos + 1],
570 data[pos + 2],
571 data[pos + 3],
572 ]) as usize;
573 pos += 4;
574
575 if pos + nalu_len > data.len() {
576 break;
577 }
578
579 use tokio::io::AsyncWriteExt;
580 file.write_all(start_code).await?;
581 file.write_all(&data[pos..pos + nalu_len]).await?;
582 total_bytes += (4 + nalu_len) as u64;
583 pos += nalu_len;
584 }
585
586 if options.max_file_size > 0 && total_bytes >= options.max_file_size {
588 tracing::info!("Max file size reached: {} bytes", total_bytes);
589 break;
590 }
591 }
592 _ => continue,
593 }
594 }
595
596 use tokio::io::AsyncWriteExt;
597 file.flush().await?;
598
599 let actual_duration = start.elapsed();
600 tracing::info!(
601 camera = %self.camera.name,
602 bytes = total_bytes,
603 duration_secs = actual_duration.as_secs_f64(),
604 "Clip recording complete"
605 );
606
607 Ok(ClipResult {
608 path: output.to_path_buf(),
609 size_bytes: total_bytes,
610 duration: actual_duration,
611 video_codec: stream_info.video_codec,
612 audio_codec: None, timestamp,
614 })
615 }
616
617 pub fn stream_info(&self) -> Option<&StreamInfo> {
625 self.stream_info.as_ref()
626 }
627
628 pub fn is_connected(&self) -> bool {
630 self.session.is_some()
631 }
632
633 pub fn camera(&self) -> &Camera {
635 &self.camera
636 }
637
638 pub async fn disconnect(&mut self) {
642 if let Some(session) = self.session.take() {
643 tracing::info!(
644 camera = %self.camera.name,
645 "Disconnecting RTSP session"
646 );
647
648 drop(session);
650
651 self.stream_info = None;
652 }
653 }
654
655 pub async fn reconnect(&mut self) -> Result<(), RtspError> {
663 tracing::info!(
664 camera = %self.camera.name,
665 "Attempting to reconnect"
666 );
667
668 self.disconnect().await;
669 self.connect().await
670 }
671}
672
673fn decode_h264_frame(data: &[u8]) -> Result<(Vec<u8>, u32, u32), RtspError> {
675 use openh264::decoder::Decoder;
676 use openh264::formats::YUVSource;
677
678 let mut decoder = Decoder::new()
679 .map_err(|e| RtspError::FrameCapture(format!("Failed to create H264 decoder: {e}")))?;
680
681 let mut annex_b = Vec::with_capacity(data.len());
685 let start_code: &[u8] = &[0x00, 0x00, 0x00, 0x01];
686 let mut pos = 0;
687 while pos + 4 <= data.len() {
688 let nalu_len =
689 u32::from_be_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
690 pos += 4;
691 if pos + nalu_len > data.len() {
692 break;
693 }
694 annex_b.extend_from_slice(start_code);
695 annex_b.extend_from_slice(&data[pos..pos + nalu_len]);
696 pos += nalu_len;
697 }
698
699 let decode_data = if annex_b.is_empty() { data } else { &annex_b };
702
703 let decoded = decoder
704 .decode(decode_data)
705 .map_err(|e| RtspError::FrameCapture(format!("H264 decode error: {e}")))?;
706
707 let decoded = decoded.ok_or_else(|| {
708 RtspError::FrameCapture("H264 decoder returned no image (need more data)".into())
709 })?;
710
711 let (width, height) = decoded.dimensions();
712 let width = width as u32;
713 let height = height as u32;
714
715 let mut rgb = vec![0u8; (width * height * 3) as usize];
717 decoded.write_rgb8(&mut rgb);
718
719 Ok((rgb, width, height))
720}
721
722fn encode_image_to_file(
724 rgb: &[u8],
725 width: u32,
726 height: u32,
727 path: &Path,
728) -> Result<u64, RtspError> {
729 let img = image::RgbImage::from_raw(width, height, rgb.to_vec())
730 .ok_or_else(|| RtspError::ImageEncoding("Failed to create image from RGB data".into()))?;
731
732 img.save(path)
733 .map_err(|e| RtspError::ImageEncoding(format!("Failed to save image: {e}")))?;
734
735 let metadata = std::fs::metadata(path)?;
736 Ok(metadata.len())
737}
738
739impl Drop for RtspClient {
740 fn drop(&mut self) {
741 if self.session.is_some() {
742 tracing::debug!(
743 camera = %self.camera.name,
744 "RtspClient dropped, session will be cleaned up"
745 );
746 }
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use crate::camera::{Camera, Protocol, Transport};
754 use std::time::Duration as StdDuration;
755
756 fn create_test_camera() -> Camera {
757 Camera {
758 name: "test-camera".to_string(),
759 host: "192.168.1.100".to_string(),
760 port: 554,
761 username: Some("admin".to_string()),
762 password: Some("password".to_string()),
763 protocol: Protocol::Rtsp,
764 transport: Transport::Tcp,
765 stream: crate::camera::StreamType::Main,
766 custom_path: None,
767 audio_enabled: false,
768 auth_method: crate::camera::AuthMethod::Auto,
769 timeout: StdDuration::from_secs(10),
770 }
771 }
772
773 #[test]
774 fn test_rtsp_client_creation() {
775 let camera = create_test_camera();
776 let client = RtspClient::new(&camera);
777
778 assert!(client.is_ok());
779 let client = client.unwrap();
780 assert!(!client.is_connected());
781 assert!(client.stream_info().is_none());
782 }
783
784 #[test]
785 fn test_rtsp_client_invalid_url() {
786 let mut camera = create_test_camera();
787 camera.host = "not a valid host!".to_string();
788
789 let client = RtspClient::new(&camera);
791 assert!(client.is_err());
792 match client {
793 Err(RtspError::InvalidUrl(_)) => {} _ => panic!("Expected InvalidUrl error"),
795 }
796 }
797
798 #[test]
799 fn test_clip_options_default() {
800 let options = ClipOptions::default();
801 assert!(options.include_audio);
802 assert!(options.audio_codec_override.is_none());
803 assert_eq!(options.container_format, ContainerFormat::Mp4);
804 assert_eq!(options.max_file_size, 0);
805 }
806
807 #[test]
808 fn test_container_format_extension() {
809 assert_eq!(ContainerFormat::Mp4.extension(), "mp4");
810 assert_eq!(ContainerFormat::Mkv.extension(), "mkv");
811 }
812
813 #[test]
814 fn test_snap_not_connected() {
815 let camera = create_test_camera();
816 let mut client = RtspClient::new(&camera).unwrap();
817
818 let rt = tokio::runtime::Runtime::new().unwrap();
820 let result = rt.block_on(client.snap(Path::new("/tmp/test.jpg")));
821
822 assert!(matches!(result, Err(RtspError::NotConnected)));
823 }
824
825 #[test]
826 fn test_clip_not_connected() {
827 let camera = create_test_camera();
828 let mut client = RtspClient::new(&camera).unwrap();
829
830 let rt = tokio::runtime::Runtime::new().unwrap();
831 let result = rt.block_on(client.clip(
832 Path::new("/tmp/test.mp4"),
833 StdDuration::from_secs(10),
834 ClipOptions::default(),
835 ));
836
837 assert!(matches!(result, Err(RtspError::NotConnected)));
838 }
839
840 #[test]
841 fn test_rtsp_error_display() {
842 let err = RtspError::ConnectionFailed("connection refused".into());
843 assert_eq!(err.to_string(), "Connection failed: connection refused");
844
845 let err = RtspError::AuthError;
846 assert_eq!(
847 err.to_string(),
848 "Authentication failed: invalid credentials"
849 );
850
851 let err = RtspError::Timeout(StdDuration::from_secs(10));
852 assert_eq!(err.to_string(), "Operation timed out after 10s");
853
854 let err = RtspError::StreamEnded;
855 assert_eq!(err.to_string(), "Stream ended unexpectedly");
856 }
857}