gpsd_json/
client.rs

1//! Asynchronous GPSD client implementation for communicating with the GPS daemon
2//!
3//! This module provides the async client interface for connecting to and
4//! communicating with a GPSD server. It supports multiple connection types,
5//! data streaming formats, and protocol versions using async/await syntax.
6//!
7//! For synchronous/blocking operations, see the `blocking` submodule.
8//!
9//! # Example
10//!
11//! ```no_run
12//! use gpsd_json::client::{GpsdClient, StreamOptions};
13//! use futures::StreamExt;
14//!
15//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
16//! // Connect to GPSD asynchronously
17//! let mut client = GpsdClient::connect("127.0.0.1:2947").await?;
18//!
19//! // Get version info
20//! let version = client.version().await?;
21//! println!("GPSD version: {}", version.release);
22//!
23//! // Start streaming GPS data
24//! let mut stream = client.stream(StreamOptions::json()).await?;
25//! while let Some(msg) = stream.next().await {
26//!     println!("GPS data: {:?}", msg?);
27//! }
28//! # Ok(())
29//! # }
30//! ```
31
32use crate::{
33    Result,
34    error::GpsdJsonError,
35    protocol::{GpsdJsonDecodeAsync, GpsdJsonEncodeAsync, GpsdJsonRequest, GpsdJsonResponse, v3},
36};
37
38/// Blocking (synchronous) client implementation
39pub mod blocking;
40
41/// Trait defining a GPSD protocol version implementation
42///
43/// This trait specifies the protocol version and associated message types
44/// for a particular version of the GPSD JSON protocol.
45pub trait GpsdJsonProtocol: Send + Sync {
46    /// Major version number of the protocol
47    const API_VERSION_MAJOR: i32;
48    /// Minor version number of the protocol
49    const API_VERSION_MINOR: i32;
50
51    /// Request message type for this protocol version
52    type Request: GpsdJsonRequest + Send + Sync;
53    /// Response message type for this protocol version
54    type Response: GpsdJsonResponse + Send + Sync;
55}
56
57/// Marker trait for data stream output formats
58///
59/// This trait is used to distinguish between different output formats
60/// (JSON, NMEA, Raw) at the type level.
61pub trait StreamFormat {}
62
63/// JSON format for structured GPS data
64///
65/// Provides parsed GPS data as JSON objects including TPV (time/position/velocity),
66/// SKY (satellite information), and other message types.
67pub struct Json;
68impl StreamFormat for Json {}
69
70/// NMEA format for raw GPS sentences
71///
72/// Provides raw NMEA 0183 sentences from the GPS receiver,
73/// such as $GPGGA, $GPRMC, etc.
74pub struct Nmea;
75impl StreamFormat for Nmea {}
76
77/// Raw format for unprocessed GPS data
78///
79/// Provides raw binary data from the GPS receiver,
80/// optionally with hex dump formatting.
81pub struct Raw;
82impl StreamFormat for Raw {}
83
84/// Configuration options for GPS data streams
85///
86/// This struct allows configuring various aspects of the data stream,
87/// such as enabling specific data types or formatting options.
88///
89/// Use the format-specific constructors (`StreamOptions::json()`, etc.)
90/// to create instances with appropriate defaults.
91#[derive(Debug, Clone)]
92pub struct StreamOptions<F: StreamFormat> {
93    inner: v3::types::Watch,
94    _format: std::marker::PhantomData<F>,
95}
96
97impl<F: StreamFormat> StreamOptions<F> {
98    /// Enables or disables scaled output
99    ///
100    /// When enabled, GPSD applies scaling to output values.
101    /// This affects units and precision of reported values.
102    pub fn scaled(mut self, enable: bool) -> Self {
103        self.inner.scaled = Some(enable);
104        self
105    }
106
107    /// Enables or disables split24 mode
108    ///
109    /// When enabled, AIS type 24 messages are split into separate
110    /// part A and part B messages.
111    pub fn split24(mut self, enable: bool) -> Self {
112        self.inner.split24 = Some(enable);
113        self
114    }
115}
116
117impl StreamOptions<Json> {
118    /// Creates stream options for JSON format output
119    ///
120    /// Returns a configuration for receiving structured GPS data
121    /// as JSON messages.
122    pub fn json() -> StreamOptions<Json> {
123        let opts = v3::types::Watch {
124            enable: Some(true),
125            json: Some(true),
126            ..Default::default()
127        };
128
129        StreamOptions::<Json> {
130            inner: opts,
131            _format: std::marker::PhantomData,
132        }
133    }
134
135    /// Enables or disables PPS (Pulse Per Second) messages
136    ///
137    /// When enabled, the stream will include PPS timing messages
138    /// if the GPS receiver supports precision timing.
139    pub fn pps(mut self, enable: bool) -> Self {
140        self.inner.pps = Some(enable);
141        self
142    }
143
144    /// Enables or disables timing information
145    ///
146    /// When enabled, messages include detailed timing information
147    /// about when data was received and processed.
148    pub fn timing(mut self, enable: bool) -> Self {
149        self.inner.timing = Some(enable);
150        self
151    }
152}
153
154impl StreamOptions<Nmea> {
155    /// Creates stream options for NMEA format output
156    ///
157    /// Returns a configuration for receiving raw NMEA 0183 sentences
158    /// from the GPS receiver.
159    pub fn nmea() -> StreamOptions<Nmea> {
160        let opts = v3::types::Watch {
161            enable: Some(true),
162            nmea: Some(true),
163            ..Default::default()
164        };
165
166        StreamOptions::<Nmea> {
167            inner: opts,
168            _format: std::marker::PhantomData,
169        }
170    }
171
172    /// Specifies a particular GPS device to stream from
173    ///
174    /// # Arguments
175    /// * `device` - Path to the GPS device (e.g., "/dev/ttyUSB0")
176    pub fn device<S: AsRef<str>>(mut self, device: S) -> Self {
177        self.inner.device = Some(device.as_ref().into());
178        self
179    }
180}
181
182impl StreamOptions<Raw> {
183    /// Creates stream options for raw format output
184    ///
185    /// Returns a configuration for receiving raw binary data
186    /// from the GPS receiver.
187    pub fn raw() -> StreamOptions<Raw> {
188        let opts = v3::types::Watch {
189            enable: Some(true),
190            raw: Some(1),
191            ..Default::default()
192        };
193
194        StreamOptions::<Raw> {
195            inner: opts,
196            _format: std::marker::PhantomData,
197        }
198    }
199
200    /// Configures hex dump mode for raw data
201    ///
202    /// # Arguments
203    /// * `enable` - true for hex dump format, false for binary
204    pub fn hex_dump(mut self, enable: bool) -> Self {
205        if enable {
206            self.inner.raw = Some(1);
207        } else {
208            self.inner.raw = Some(2);
209        }
210        self
211    }
212
213    /// Specifies a particular GPS device to stream from
214    ///
215    /// # Arguments
216    /// * `device` - Path to the GPS device (e.g., "/dev/ttyUSB0")
217    pub fn device<S: AsRef<str>>(mut self, device: S) -> Self {
218        self.inner.device = Some(device.as_ref().into());
219        self
220    }
221}
222
223/// Core implementation of an asynchronous GPSD client
224///
225/// This struct provides the fundamental functionality for asynchronous
226/// communication with a GPSD server. It handles protocol negotiation,
227/// message serialization/deserialization, and maintains the connection state.
228///
229/// # Type Parameters
230/// * `Stream` - The underlying async I/O stream type (e.g., TcpStream)
231/// * `Proto` - The GPSD protocol version implementation
232#[derive(Debug)]
233pub struct GpsdClientCore<Stream, Proto> {
234    reader: futures_util::io::BufReader<Stream>,
235    buf: Vec<u8>,
236    _proto: std::marker::PhantomData<Proto>,
237}
238
239impl<Stream, Proto> GpsdClientCore<Stream, Proto>
240where
241    Proto: GpsdJsonProtocol,
242{
243    /// Opens a new GPSD client connection using the provided async stream
244    ///
245    /// This method initializes the client with the given async I/O stream and
246    /// performs protocol version negotiation with the GPSD server.
247    ///
248    /// # Arguments
249    /// * `stream` - The async I/O stream for communication with GPSD
250    ///
251    /// # Returns
252    /// * `Ok(client)` - Successfully connected and negotiated protocol
253    /// * `Err(_)` - Connection or protocol negotiation failed
254    pub async fn open(stream: Stream) -> Result<Self>
255    where
256        Stream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
257    {
258        let reader = futures_util::io::BufReader::new(stream);
259        let mut client = GpsdClientCore {
260            reader,
261            buf: Vec::new(),
262            _proto: std::marker::PhantomData,
263        };
264
265        client.ensure_version().await?;
266        Ok(client)
267    }
268
269    /// Sends a request message to the GPSD server asynchronously
270    async fn send(&mut self, msg: &Proto::Request) -> Result<()>
271    where
272        Stream: futures_io::AsyncWrite + Unpin,
273    {
274        self.reader.write_request(msg).await
275    }
276
277    /// Receives a response message from the GPSD server asynchronously
278    ///
279    /// Returns `None` if the connection is closed.
280    fn recv(&mut self) -> impl std::future::Future<Output = Result<Option<Proto::Response>>>
281    where
282        Stream: futures_io::AsyncRead + Unpin,
283    {
284        futures_util::future::poll_fn(|cx| {
285            std::pin::Pin::new(&mut self.reader).poll_response::<Proto::Response>(cx, &mut self.buf)
286        })
287    }
288
289    /// Ensures the connected GPSD server supports this protocol version
290    ///
291    /// Reads the version message from GPSD and verifies compatibility.
292    /// The client requires the major version to match exactly and the
293    /// minor version to be greater than or equal to the expected version.
294    async fn ensure_version(&mut self) -> Result<()>
295    where
296        Stream: futures_io::AsyncRead + Unpin,
297    {
298        use futures_util::AsyncBufReadExt;
299        self.buf.clear();
300        let bytes_read = self
301            .reader
302            .read_until(b'\n', &mut self.buf)
303            .await
304            .map_err(GpsdJsonError::IoError)?;
305
306        if bytes_read == 0 {
307            return Err(GpsdJsonError::ProtocolError(
308                "Connection closed by GPSD before version message",
309            ));
310        }
311
312        let ret = if let Ok(Some(v3::ResponseMessage::Version(version))) =
313            serde_json::from_slice(&self.buf)
314        {
315            if Proto::API_VERSION_MAJOR != version.proto_major
316                || Proto::API_VERSION_MINOR < version.proto_minor
317            {
318                Err(GpsdJsonError::UnsupportedProtocolVersion((
319                    version.proto_major,
320                    version.proto_minor,
321                )))
322            } else {
323                Ok(())
324            }
325        } else {
326            Err(GpsdJsonError::ProtocolError(
327                "Failed to read version message from GPSD",
328            ))
329        };
330
331        self.buf.clear();
332        ret
333    }
334}
335
336#[cfg(feature = "tokio")]
337impl<Proto> GpsdClientCore<tokio_util::compat::Compat<tokio::net::TcpStream>, Proto>
338where
339    Proto: GpsdJsonProtocol,
340{
341    /// Connects to a GPSD server over TCP asynchronously
342    ///
343    /// Creates an async TCP connection to the specified address and initializes
344    /// a GPSD client with protocol negotiation.
345    ///
346    /// # Arguments
347    /// * `addr` - Socket address of the GPSD server (e.g., "127.0.0.1:2947")
348    ///
349    /// # Example
350    /// ```no_run
351    /// # use gpsd_json::client::GpsdClient;
352    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
353    /// let client = GpsdClient::connect("127.0.0.1:2947").await?;
354    /// # Ok(())
355    /// # }
356    /// ```
357    pub async fn connect<A: tokio::net::ToSocketAddrs>(addr: A) -> Result<Self> {
358        use tokio_util::compat::TokioAsyncReadCompatExt;
359
360        let stream = tokio::net::TcpStream::connect(addr)
361            .await
362            .map_err(GpsdJsonError::IoError)?;
363        let client = GpsdClientCore::open(stream.compat()).await?;
364        Ok(client)
365    }
366}
367
368/// Type alias for an async GPSD client using protocol version 3
369///
370/// This is the most common async client type and should be used for
371/// connecting to modern GPSD servers (version 3.x) with async/await.
372#[cfg(feature = "proto-v3")]
373pub type GpsdClient<Stream> = GpsdClientCore<Stream, v3::V3>;
374
375impl<Stream> GpsdClient<Stream>
376where
377    Stream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
378{
379    /// Requests version information from the GPSD server
380    ///
381    /// Returns details about the GPSD server version, protocol version,
382    /// and capabilities.
383    pub async fn version(&mut self) -> Result<v3::response::Version> {
384        self.send(&v3::RequestMessage::Version).await?;
385        if let Some(v3::ResponseMessage::Version(version)) = self.recv().await? {
386            Ok(version)
387        } else {
388            Err(GpsdJsonError::ProtocolError(
389                "Expected version response from GPSD",
390            ))
391        }
392    }
393
394    /// Lists all GPS devices known to the GPSD server
395    ///
396    /// Returns information about each connected GPS receiver including
397    /// device paths, driver information, and current status.
398    pub async fn devices(&mut self) -> Result<v3::response::DeviceList> {
399        self.send(&v3::RequestMessage::Devices).await?;
400        if let Some(v3::ResponseMessage::Devices(devices)) = self.recv().await? {
401            Ok(devices)
402        } else {
403            Err(GpsdJsonError::ProtocolError(
404                "Expected devices response from GPSD",
405            ))
406        }
407    }
408
409    /// Gets information about the currently active GPS device
410    ///
411    /// Returns detailed information about the device currently being
412    /// used for GPS data.
413    pub async fn device(&mut self) -> Result<v3::types::Device> {
414        self.send(&v3::RequestMessage::Device(None)).await?;
415        if let Some(v3::ResponseMessage::Device(device)) = self.recv().await? {
416            Ok(device)
417        } else {
418            Err(GpsdJsonError::ProtocolError(
419                "Expected device response from GPSD",
420            ))
421        }
422    }
423
424    /// Enables data streaming from GPSD with default settings
425    ///
426    /// Returns the current watch configuration and list of available devices.
427    /// After calling this method, GPS data will be streamed from the server.
428    pub async fn watch(&mut self) -> Result<(v3::types::Watch, v3::response::DeviceList)> {
429        self.send(&v3::RequestMessage::Watch(None)).await?;
430        let Some(v3::ResponseMessage::Devices(devices)) = self.recv().await? else {
431            return Err(GpsdJsonError::ProtocolError(
432                "Expected devices response from GPSD",
433            ));
434        };
435        let Some(v3::ResponseMessage::Watch(watch)) = self.recv().await? else {
436            return Err(GpsdJsonError::ProtocolError(
437                "Expected watch response from GPSD",
438            ));
439        };
440
441        Ok((watch, devices))
442    }
443
444    /// Polls for the current GPS fix data
445    ///
446    /// Returns the most recent GPS fix information available from
447    /// all active devices.
448    pub async fn poll(&mut self) -> Result<v3::response::Poll> {
449        self.send(&v3::RequestMessage::Poll).await?;
450        if let Some(v3::ResponseMessage::Poll(poll)) = self.recv().await? {
451            Ok(poll)
452        } else {
453            Err(GpsdJsonError::ProtocolError(
454                "Expected poll response from GPSD",
455            ))
456        }
457    }
458
459    /// Enables or disables data streaming mode
460    ///
461    /// # Arguments
462    /// * `enable` - true to start streaming, false to stop
463    pub async fn watch_mode(&mut self, enable: bool) -> Result<()> {
464        let (watch, _devices) = self
465            .set_watch(v3::types::Watch {
466                enable: Some(enable),
467                ..Default::default()
468            })
469            .await?;
470
471        assert_eq!(watch.enable, Some(enable));
472        Ok(())
473    }
474
475    /// Starts a data stream with the specified format and options
476    ///
477    /// This method consumes the client and returns a stream iterator
478    /// that yields GPS data in the requested format.
479    ///
480    /// # Arguments
481    /// * `opts` - Stream configuration options
482    ///
483    /// # Example
484    /// ```no_run
485    /// # use gpsd_json::client::{GpsdClient, StreamOptions};
486    /// # use futures::StreamExt;
487    /// # async fn example(client: GpsdClient<impl futures_io::AsyncRead + futures_io::AsyncWrite + Unpin>) -> Result<(), Box<dyn std::error::Error>> {
488    /// let mut stream = client.stream(StreamOptions::json()).await?;
489    /// while let Some(msg) = stream.next().await {
490    ///     println!("GPS data: {:?}", msg?);
491    /// }
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub async fn stream<Format: StreamFormat>(
496        mut self,
497        opts: StreamOptions<Format>,
498    ) -> Result<GpsdDataStream<Stream, v3::V3, Format>> {
499        let (watch, _devices) = self.set_watch(opts.inner).await?;
500        assert_eq!(watch.enable, Some(true));
501
502        Ok(GpsdDataStream {
503            inner: self,
504            _format: std::marker::PhantomData,
505        })
506    }
507
508    /// Configures watch mode settings
509    ///
510    /// Internal method to set watch parameters and receive confirmation.
511    async fn set_watch(
512        &mut self,
513        watch: v3::types::Watch,
514    ) -> Result<(v3::types::Watch, v3::response::DeviceList)> {
515        self.send(&v3::RequestMessage::Watch(Some(watch))).await?;
516        let Some(v3::ResponseMessage::Devices(devices)) = self.recv().await? else {
517            return Err(GpsdJsonError::ProtocolError(
518                "Expected devices response from GPSD",
519            ));
520        };
521        let Some(v3::ResponseMessage::Watch(watch)) = self.recv().await? else {
522            return Err(GpsdJsonError::ProtocolError(
523                "Expected watch response from GPSD",
524            ));
525        };
526
527        Ok((watch, devices))
528    }
529}
530
531/// Async stream for receiving GPS data from GPSD
532///
533/// This struct provides an async stream interface (implements `futures::Stream`)
534/// for receiving continuous GPS data from a GPSD server. The format of the data
535/// depends on the stream format type parameter.
536///
537/// The stream continues until explicitly closed or an error occurs.
538///
539/// # Example
540/// ```no_run
541/// # use gpsd_json::client::{GpsdClient, StreamOptions};
542/// # use futures::StreamExt;
543/// # async fn example(client: GpsdClient<impl futures_io::AsyncRead + futures_io::AsyncWrite + Unpin>) -> Result<(), Box<dyn std::error::Error>> {
544/// let mut stream = client.stream(StreamOptions::json()).await?;
545/// while let Some(result) = stream.next().await {
546///     match result {
547///         Ok(data) => println!("Received: {:?}", data),
548///         Err(e) => eprintln!("Error: {}", e),
549///     }
550/// }
551/// # Ok(())
552/// # }
553/// ```
554pub struct GpsdDataStream<Stream, Proto, Format>
555where
556    Proto: GpsdJsonProtocol,
557    Format: StreamFormat,
558{
559    inner: GpsdClientCore<Stream, Proto>,
560    _format: std::marker::PhantomData<Format>,
561}
562
563impl<Stream, Format> GpsdDataStream<Stream, v3::V3, Format>
564where
565    Stream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
566    Format: StreamFormat,
567{
568    /// Closes the data stream and returns the client
569    ///
570    /// This method stops the GPS data stream and returns the underlying
571    /// client for further operations.
572    pub async fn close(mut self) -> Result<GpsdClient<Stream>> {
573        let watch = v3::types::Watch::default();
574        self.inner
575            .send(&v3::RequestMessage::Watch(Some(watch)))
576            .await?;
577
578        loop {
579            match self.inner.recv().await {
580                Ok(Some(v3::ResponseMessage::Watch(watch))) => {
581                    assert_eq!(watch.enable, Some(false));
582                    break;
583                }
584                Ok(Some(_)) | Err(GpsdJsonError::SerdeError(_)) => continue,
585                Err(e) => return Err(e),
586                Ok(None) => {
587                    return Err(GpsdJsonError::ProtocolError(
588                        "Stream ended unexpectedly while closing",
589                    ));
590                }
591            }
592        }
593
594        self.inner.buf.clear();
595        Ok(self.inner)
596    }
597}
598
599impl<Stream, Proto> futures_util::Stream for GpsdDataStream<Stream, Proto, Json>
600where
601    Stream: futures_io::AsyncRead + Unpin,
602    Proto: GpsdJsonProtocol + Unpin,
603{
604    type Item = Result<Proto::Response>;
605
606    fn poll_next(
607        self: std::pin::Pin<&mut Self>,
608        cx: &mut std::task::Context<'_>,
609    ) -> std::task::Poll<Option<Self::Item>> {
610        let this = self.get_mut();
611        let reader = std::pin::Pin::new(&mut this.inner.reader);
612
613        match reader.poll_response::<Proto::Response>(cx, &mut this.inner.buf) {
614            std::task::Poll::Ready(Ok(Some(msg))) => std::task::Poll::Ready(Some(Ok(msg))),
615            std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(None),
616            std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
617            std::task::Poll::Pending => std::task::Poll::Pending,
618        }
619    }
620}
621
622impl<Stream, Proto> futures_util::Stream for GpsdDataStream<Stream, Proto, Nmea>
623where
624    Stream: futures_io::AsyncRead + Unpin,
625    Proto: GpsdJsonProtocol + Unpin,
626{
627    type Item = Result<String>;
628
629    fn poll_next(
630        self: std::pin::Pin<&mut Self>,
631        cx: &mut std::task::Context<'_>,
632    ) -> std::task::Poll<Option<Self::Item>> {
633        let this = self.get_mut();
634        let reader = std::pin::Pin::new(&mut this.inner.reader);
635
636        match reader.poll_raw(cx, &mut this.inner.buf) {
637            std::task::Poll::Ready(Ok(Some(line))) => {
638                let line_str = String::from_utf8_lossy(&line).trim_end().to_string();
639                std::task::Poll::Ready(Some(Ok(line_str)))
640            }
641            std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(None),
642            std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
643            std::task::Poll::Pending => std::task::Poll::Pending,
644        }
645    }
646}
647
648impl<Stream, Proto> futures_util::Stream for GpsdDataStream<Stream, Proto, Raw>
649where
650    Stream: futures_io::AsyncRead + Unpin,
651    Proto: GpsdJsonProtocol + Unpin,
652{
653    type Item = Result<Vec<u8>>;
654
655    fn poll_next(
656        self: std::pin::Pin<&mut Self>,
657        cx: &mut std::task::Context<'_>,
658    ) -> std::task::Poll<Option<Self::Item>> {
659        let this = self.get_mut();
660        let reader = std::pin::Pin::new(&mut this.inner.reader);
661
662        match reader.poll_raw(cx, &mut this.inner.buf) {
663            std::task::Poll::Ready(Ok(Some(line))) => std::task::Poll::Ready(Some(Ok(line))),
664            std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(None),
665            std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
666            std::task::Poll::Pending => std::task::Poll::Pending,
667        }
668    }
669}