use deku::reader::Reader;
use deku::DekuReader;
use tracing::error;
use crate::decode::nmea::{MessageAssembler, NmeaAisMessage, NmeaError};
use crate::prelude::Message;
use std::io::{BufRead, BufReader, Cursor, Lines};
use std::net::TcpStream;
use tokio::io::{AsyncBufReadExt, BufReader as AsyncBufReader};
use tokio::net::TcpStream as AsyncTcpStream;
#[derive(Debug, Clone)]
pub struct TimestampedMessage {
pub timestamp: u64,
pub binary_data: Vec<u8>,
pub serial: u64,
}
impl TimestampedMessage {
pub fn decode(&self) -> Option<Message> {
let cursor = Cursor::new(&self.binary_data);
let mut reader = Reader::new(cursor);
Message::from_reader_with_ctx(&mut reader, ()).ok()
}
}
pub struct TimestampedNmeaIterator<R: BufRead> {
lines: Lines<R>,
assembler: MessageAssembler,
}
impl<R: BufRead> TimestampedNmeaIterator<R> {
pub fn new(reader: R) -> Self {
Self {
lines: reader.lines(),
assembler: MessageAssembler::new(),
}
}
pub fn next_complete_message(&mut self) -> Option<Result<TimestampedMessage, NmeaError>> {
loop {
let line = match self.lines.next() {
Some(Ok(line)) => line,
Some(Err(_)) => return None, None => return None, };
let line = line.trim();
let (serial, timestamp, nmea_sentence) = match parse_timestamped_line(line) {
Ok(parsed) => parsed,
Err(e) => {
error!("Error parsing timestamped line '{}': {}", line, e);
continue;
}
};
let nmea_msg = match NmeaAisMessage::parse(&nmea_sentence) {
Ok(msg) => msg,
Err(e) => return Some(Err(e)),
};
match self.assembler.add_fragment(nmea_msg) {
Ok(Some(complete_message)) => {
return Some(Ok(TimestampedMessage {
timestamp,
binary_data: complete_message,
serial,
}))
}
Ok(None) => continue, Err(e) => return Some(Err(e)),
}
}
}
}
impl<R: BufRead> Iterator for TimestampedNmeaIterator<R> {
type Item = Result<TimestampedMessage, NmeaError>;
fn next(&mut self) -> Option<Self::Item> {
self.next_complete_message()
}
}
pub struct TimestampedNmeaTcpSource {
iterator: TimestampedNmeaIterator<BufReader<TcpStream>>,
}
impl TimestampedNmeaTcpSource {
pub fn new(server_address: &str) -> Result<Self, std::io::Error> {
let stream = TcpStream::connect(server_address)?;
let reader = BufReader::new(stream);
Ok(Self {
iterator: TimestampedNmeaIterator::new(reader),
})
}
}
impl Iterator for TimestampedNmeaTcpSource {
type Item = Result<TimestampedMessage, NmeaError>;
fn next(&mut self) -> Option<Self::Item> {
self.iterator.next()
}
}
pub struct AsyncTimestampedNmeaIterator<R: tokio::io::AsyncBufRead + Unpin> {
lines: tokio::io::Lines<R>,
assembler: MessageAssembler,
}
impl<R: tokio::io::AsyncBufRead + Unpin> AsyncTimestampedNmeaIterator<R> {
pub fn new(reader: R) -> Self {
Self {
lines: reader.lines(),
assembler: MessageAssembler::new(),
}
}
pub async fn next(&mut self) -> Option<Result<TimestampedMessage, NmeaError>> {
loop {
let line = match self.lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => return None, Err(_) => return None, };
let line = line.trim();
let (serial, timestamp, nmea_sentence) = match parse_timestamped_line(line) {
Ok(parsed) => parsed,
Err(e) => {
error!("Error parsing timestamped line '{}': {}", line, e);
continue;
}
};
let nmea_msg = match NmeaAisMessage::parse(&nmea_sentence) {
Ok(msg) => msg,
Err(e) => return Some(Err(e)),
};
match self.assembler.add_fragment(nmea_msg) {
Ok(Some(complete_message)) => {
return Some(Ok(TimestampedMessage {
timestamp,
binary_data: complete_message,
serial,
}))
}
Ok(None) => continue, Err(e) => return Some(Err(e)),
}
}
}
}
pub struct AsyncTimestampedNmeaTcpSource {
iterator: AsyncTimestampedNmeaIterator<AsyncBufReader<AsyncTcpStream>>,
}
impl AsyncTimestampedNmeaTcpSource {
pub async fn new(server_address: &str) -> Result<Self, std::io::Error> {
let stream = AsyncTcpStream::connect(server_address).await?;
let reader = AsyncBufReader::new(stream);
Ok(Self {
iterator: AsyncTimestampedNmeaIterator::new(reader),
})
}
pub async fn next(&mut self) -> Option<Result<TimestampedMessage, NmeaError>> {
self.iterator.next().await
}
}
#[cfg(feature = "ssh")]
pub struct AsyncTimestampedNmeaSource<R: tokio::io::AsyncBufRead + Unpin> {
iterator: AsyncTimestampedNmeaIterator<R>,
}
#[cfg(feature = "ssh")]
impl<R: tokio::io::AsyncBufRead + Unpin> AsyncTimestampedNmeaSource<R> {
pub fn from_reader(reader: R) -> Self {
Self {
iterator: AsyncTimestampedNmeaIterator::new(reader),
}
}
pub async fn next(&mut self) -> Option<Result<TimestampedMessage, NmeaError>> {
self.iterator.next().await
}
}
fn parse_timestamped_line(line: &str) -> Result<(u64, u64, String), Box<dyn std::error::Error>> {
let parts: Vec<&str> = line.splitn(2, "\\!").collect();
if parts.len() != 2 {
return Err("Invalid timestamped format: missing \\!".into());
}
let timestamp_part = parts[0];
let nmea_part = format!("!{}", parts[1]);
if !timestamp_part.starts_with("\\s:") {
return Err("Invalid timestamped format: missing \\s:".into());
}
let timestamp_content = ×tamp_part[3..];
let timestamp_parts: Vec<&str> = timestamp_content.split('*').collect();
if timestamp_parts.len() != 2 {
return Err("Invalid timestamped format: missing checksum".into());
}
let data_part = timestamp_parts[0];
let fields: Vec<&str> = data_part.split(',').collect();
if fields.len() != 2 {
return Err("Invalid timestamped format: expected s:serial,c:timestamp".into());
}
let serial = fields[0]
.parse::<u64>()
.map_err(|_| "Invalid serial number")?;
if !fields[1].starts_with("c:") {
return Err("Invalid timestamped format: missing c: prefix".into());
}
let timestamp = fields[1][2..]
.parse::<u64>()
.map_err(|_| "Invalid timestamp")?;
Ok((serial, timestamp, nmea_part))
}