gpsd_json/client.rs
1//! Asynchronous GPSD client implementation for communicating with the GPS daemon
2//!
3//! This module provides the async client interface for connecting to and
4//! communicating with a GPSD server. It supports multiple connection types,
5//! data streaming formats, and protocol versions using async/await syntax.
6//!
7//! For synchronous/blocking operations, see the `blocking` submodule.
8//!
9//! # Example
10//!
11//! ```no_run
12//! use gpsd_json::client::{GpsdClient, StreamOptions};
13//! use futures::StreamExt;
14//!
15//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
16//! // Connect to GPSD asynchronously
17//! let mut client = GpsdClient::connect("127.0.0.1:2947").await?;
18//!
19//! // Get version info
20//! let version = client.version().await?;
21//! println!("GPSD version: {}", version.release);
22//!
23//! // Start streaming GPS data
24//! let mut stream = client.stream(StreamOptions::json()).await?;
25//! while let Some(msg) = stream.next().await {
26//! println!("GPS data: {:?}", msg?);
27//! }
28//! # Ok(())
29//! # }
30//! ```
31
32use crate::{
33 Result,
34 error::GpsdJsonError,
35 protocol::{GpsdJsonDecodeAsync, GpsdJsonEncodeAsync, GpsdJsonRequest, GpsdJsonResponse, v3},
36};
37
38/// Blocking (synchronous) client implementation
39pub mod blocking;
40
41/// Trait defining a GPSD protocol version implementation
42///
43/// This trait specifies the protocol version and associated message types
44/// for a particular version of the GPSD JSON protocol.
45pub trait GpsdJsonProtocol: Send + Sync {
46 /// Major version number of the protocol
47 const API_VERSION_MAJOR: i32;
48 /// Minor version number of the protocol
49 const API_VERSION_MINOR: i32;
50
51 /// Request message type for this protocol version
52 type Request: GpsdJsonRequest + Send + Sync;
53 /// Response message type for this protocol version
54 type Response: GpsdJsonResponse + Send + Sync;
55}
56
57/// Marker trait for data stream output formats
58///
59/// This trait is used to distinguish between different output formats
60/// (JSON, NMEA, Raw) at the type level.
61pub trait StreamFormat {}
62
63/// JSON format for structured GPS data
64///
65/// Provides parsed GPS data as JSON objects including TPV (time/position/velocity),
66/// SKY (satellite information), and other message types.
67pub struct Json;
68impl StreamFormat for Json {}
69
70/// NMEA format for raw GPS sentences
71///
72/// Provides raw NMEA 0183 sentences from the GPS receiver,
73/// such as $GPGGA, $GPRMC, etc.
74pub struct Nmea;
75impl StreamFormat for Nmea {}
76
77/// Raw format for unprocessed GPS data
78///
79/// Provides raw binary data from the GPS receiver,
80/// optionally with hex dump formatting.
81pub struct Raw;
82impl StreamFormat for Raw {}
83
84/// Configuration options for GPS data streams
85///
86/// This struct allows configuring various aspects of the data stream,
87/// such as enabling specific data types or formatting options.
88///
89/// Use the format-specific constructors (`StreamOptions::json()`, etc.)
90/// to create instances with appropriate defaults.
91#[derive(Debug, Clone)]
92pub struct StreamOptions<F: StreamFormat> {
93 inner: v3::types::Watch,
94 _format: std::marker::PhantomData<F>,
95}
96
97impl<F: StreamFormat> StreamOptions<F> {
98 /// Enables or disables scaled output
99 ///
100 /// When enabled, GPSD applies scaling to output values.
101 /// This affects units and precision of reported values.
102 pub fn scaled(mut self, enable: bool) -> Self {
103 self.inner.scaled = Some(enable);
104 self
105 }
106
107 /// Enables or disables split24 mode
108 ///
109 /// When enabled, AIS type 24 messages are split into separate
110 /// part A and part B messages.
111 pub fn split24(mut self, enable: bool) -> Self {
112 self.inner.split24 = Some(enable);
113 self
114 }
115}
116
117impl StreamOptions<Json> {
118 /// Creates stream options for JSON format output
119 ///
120 /// Returns a configuration for receiving structured GPS data
121 /// as JSON messages.
122 pub fn json() -> StreamOptions<Json> {
123 let opts = v3::types::Watch {
124 enable: Some(true),
125 json: Some(true),
126 ..Default::default()
127 };
128
129 StreamOptions::<Json> {
130 inner: opts,
131 _format: std::marker::PhantomData,
132 }
133 }
134
135 /// Enables or disables PPS (Pulse Per Second) messages
136 ///
137 /// When enabled, the stream will include PPS timing messages
138 /// if the GPS receiver supports precision timing.
139 pub fn pps(mut self, enable: bool) -> Self {
140 self.inner.pps = Some(enable);
141 self
142 }
143
144 /// Enables or disables timing information
145 ///
146 /// When enabled, messages include detailed timing information
147 /// about when data was received and processed.
148 pub fn timing(mut self, enable: bool) -> Self {
149 self.inner.timing = Some(enable);
150 self
151 }
152}
153
154impl StreamOptions<Nmea> {
155 /// Creates stream options for NMEA format output
156 ///
157 /// Returns a configuration for receiving raw NMEA 0183 sentences
158 /// from the GPS receiver.
159 pub fn nmea() -> StreamOptions<Nmea> {
160 let opts = v3::types::Watch {
161 enable: Some(true),
162 nmea: Some(true),
163 ..Default::default()
164 };
165
166 StreamOptions::<Nmea> {
167 inner: opts,
168 _format: std::marker::PhantomData,
169 }
170 }
171
172 /// Specifies a particular GPS device to stream from
173 ///
174 /// # Arguments
175 /// * `device` - Path to the GPS device (e.g., "/dev/ttyUSB0")
176 pub fn device<S: AsRef<str>>(mut self, device: S) -> Self {
177 self.inner.device = Some(device.as_ref().into());
178 self
179 }
180}
181
182impl StreamOptions<Raw> {
183 /// Creates stream options for raw format output
184 ///
185 /// Returns a configuration for receiving raw binary data
186 /// from the GPS receiver.
187 pub fn raw() -> StreamOptions<Raw> {
188 let opts = v3::types::Watch {
189 enable: Some(true),
190 raw: Some(1),
191 ..Default::default()
192 };
193
194 StreamOptions::<Raw> {
195 inner: opts,
196 _format: std::marker::PhantomData,
197 }
198 }
199
200 /// Configures hex dump mode for raw data
201 ///
202 /// # Arguments
203 /// * `enable` - true for hex dump format, false for binary
204 pub fn hex_dump(mut self, enable: bool) -> Self {
205 if enable {
206 self.inner.raw = Some(1);
207 } else {
208 self.inner.raw = Some(2);
209 }
210 self
211 }
212
213 /// Specifies a particular GPS device to stream from
214 ///
215 /// # Arguments
216 /// * `device` - Path to the GPS device (e.g., "/dev/ttyUSB0")
217 pub fn device<S: AsRef<str>>(mut self, device: S) -> Self {
218 self.inner.device = Some(device.as_ref().into());
219 self
220 }
221}
222
223/// Core implementation of an asynchronous GPSD client
224///
225/// This struct provides the fundamental functionality for asynchronous
226/// communication with a GPSD server. It handles protocol negotiation,
227/// message serialization/deserialization, and maintains the connection state.
228///
229/// # Type Parameters
230/// * `Stream` - The underlying async I/O stream type (e.g., TcpStream)
231/// * `Proto` - The GPSD protocol version implementation
232#[derive(Debug)]
233pub struct GpsdClientCore<Stream, Proto> {
234 reader: futures_util::io::BufReader<Stream>,
235 buf: Vec<u8>,
236 _proto: std::marker::PhantomData<Proto>,
237}
238
239impl<Stream, Proto> GpsdClientCore<Stream, Proto>
240where
241 Proto: GpsdJsonProtocol,
242{
243 /// Opens a new GPSD client connection using the provided async stream
244 ///
245 /// This method initializes the client with the given async I/O stream and
246 /// performs protocol version negotiation with the GPSD server.
247 ///
248 /// # Arguments
249 /// * `stream` - The async I/O stream for communication with GPSD
250 ///
251 /// # Returns
252 /// * `Ok(client)` - Successfully connected and negotiated protocol
253 /// * `Err(_)` - Connection or protocol negotiation failed
254 pub async fn open(stream: Stream) -> Result<Self>
255 where
256 Stream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
257 {
258 let reader = futures_util::io::BufReader::new(stream);
259 let mut client = GpsdClientCore {
260 reader,
261 buf: Vec::new(),
262 _proto: std::marker::PhantomData,
263 };
264
265 client.ensure_version().await?;
266 Ok(client)
267 }
268
269 /// Sends a request message to the GPSD server asynchronously
270 async fn send(&mut self, msg: &Proto::Request) -> Result<()>
271 where
272 Stream: futures_io::AsyncWrite + Unpin,
273 {
274 self.reader.write_request(msg).await
275 }
276
277 /// Receives a response message from the GPSD server asynchronously
278 ///
279 /// Returns `None` if the connection is closed.
280 fn recv(&mut self) -> impl std::future::Future<Output = Result<Option<Proto::Response>>>
281 where
282 Stream: futures_io::AsyncRead + Unpin,
283 {
284 futures_util::future::poll_fn(|cx| {
285 std::pin::Pin::new(&mut self.reader).poll_response::<Proto::Response>(cx, &mut self.buf)
286 })
287 }
288
289 /// Ensures the connected GPSD server supports this protocol version
290 ///
291 /// Reads the version message from GPSD and verifies compatibility.
292 /// The client requires the major version to match exactly and the
293 /// minor version to be greater than or equal to the expected version.
294 async fn ensure_version(&mut self) -> Result<()>
295 where
296 Stream: futures_io::AsyncRead + Unpin,
297 {
298 use futures_util::AsyncBufReadExt;
299 self.buf.clear();
300 let bytes_read = self
301 .reader
302 .read_until(b'\n', &mut self.buf)
303 .await
304 .map_err(GpsdJsonError::IoError)?;
305
306 if bytes_read == 0 {
307 return Err(GpsdJsonError::ProtocolError(
308 "Connection closed by GPSD before version message",
309 ));
310 }
311
312 let ret = if let Ok(Some(v3::ResponseMessage::Version(version))) =
313 serde_json::from_slice(&self.buf)
314 {
315 if Proto::API_VERSION_MAJOR != version.proto_major
316 || Proto::API_VERSION_MINOR < version.proto_minor
317 {
318 Err(GpsdJsonError::UnsupportedProtocolVersion((
319 version.proto_major,
320 version.proto_minor,
321 )))
322 } else {
323 Ok(())
324 }
325 } else {
326 Err(GpsdJsonError::ProtocolError(
327 "Failed to read version message from GPSD",
328 ))
329 };
330
331 self.buf.clear();
332 ret
333 }
334}
335
336#[cfg(feature = "tokio")]
337impl<Proto> GpsdClientCore<tokio_util::compat::Compat<tokio::net::TcpStream>, Proto>
338where
339 Proto: GpsdJsonProtocol,
340{
341 /// Connects to a GPSD server over TCP asynchronously
342 ///
343 /// Creates an async TCP connection to the specified address and initializes
344 /// a GPSD client with protocol negotiation.
345 ///
346 /// # Arguments
347 /// * `addr` - Socket address of the GPSD server (e.g., "127.0.0.1:2947")
348 ///
349 /// # Example
350 /// ```no_run
351 /// # use gpsd_json::client::GpsdClient;
352 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
353 /// let client = GpsdClient::connect("127.0.0.1:2947").await?;
354 /// # Ok(())
355 /// # }
356 /// ```
357 pub async fn connect<A: tokio::net::ToSocketAddrs>(addr: A) -> Result<Self> {
358 use tokio_util::compat::TokioAsyncReadCompatExt;
359
360 let stream = tokio::net::TcpStream::connect(addr)
361 .await
362 .map_err(GpsdJsonError::IoError)?;
363 let client = GpsdClientCore::open(stream.compat()).await?;
364 Ok(client)
365 }
366}
367
368/// Type alias for an async GPSD client using protocol version 3
369///
370/// This is the most common async client type and should be used for
371/// connecting to modern GPSD servers (version 3.x) with async/await.
372#[cfg(feature = "proto-v3")]
373pub type GpsdClient<Stream> = GpsdClientCore<Stream, v3::V3>;
374
375impl<Stream> GpsdClient<Stream>
376where
377 Stream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
378{
379 /// Requests version information from the GPSD server
380 ///
381 /// Returns details about the GPSD server version, protocol version,
382 /// and capabilities.
383 pub async fn version(&mut self) -> Result<v3::response::Version> {
384 self.send(&v3::RequestMessage::Version).await?;
385 if let Some(v3::ResponseMessage::Version(version)) = self.recv().await? {
386 Ok(version)
387 } else {
388 Err(GpsdJsonError::ProtocolError(
389 "Expected version response from GPSD",
390 ))
391 }
392 }
393
394 /// Lists all GPS devices known to the GPSD server
395 ///
396 /// Returns information about each connected GPS receiver including
397 /// device paths, driver information, and current status.
398 pub async fn devices(&mut self) -> Result<v3::response::DeviceList> {
399 self.send(&v3::RequestMessage::Devices).await?;
400 if let Some(v3::ResponseMessage::Devices(devices)) = self.recv().await? {
401 Ok(devices)
402 } else {
403 Err(GpsdJsonError::ProtocolError(
404 "Expected devices response from GPSD",
405 ))
406 }
407 }
408
409 /// Gets information about the currently active GPS device
410 ///
411 /// Returns detailed information about the device currently being
412 /// used for GPS data.
413 pub async fn device(&mut self) -> Result<v3::types::Device> {
414 self.send(&v3::RequestMessage::Device(None)).await?;
415 if let Some(v3::ResponseMessage::Device(device)) = self.recv().await? {
416 Ok(device)
417 } else {
418 Err(GpsdJsonError::ProtocolError(
419 "Expected device response from GPSD",
420 ))
421 }
422 }
423
424 /// Enables data streaming from GPSD with default settings
425 ///
426 /// Returns the current watch configuration and list of available devices.
427 /// After calling this method, GPS data will be streamed from the server.
428 pub async fn watch(&mut self) -> Result<(v3::types::Watch, v3::response::DeviceList)> {
429 self.send(&v3::RequestMessage::Watch(None)).await?;
430 let Some(v3::ResponseMessage::Devices(devices)) = self.recv().await? else {
431 return Err(GpsdJsonError::ProtocolError(
432 "Expected devices response from GPSD",
433 ));
434 };
435 let Some(v3::ResponseMessage::Watch(watch)) = self.recv().await? else {
436 return Err(GpsdJsonError::ProtocolError(
437 "Expected watch response from GPSD",
438 ));
439 };
440
441 Ok((watch, devices))
442 }
443
444 /// Polls for the current GPS fix data
445 ///
446 /// Returns the most recent GPS fix information available from
447 /// all active devices.
448 pub async fn poll(&mut self) -> Result<v3::response::Poll> {
449 self.send(&v3::RequestMessage::Poll).await?;
450 if let Some(v3::ResponseMessage::Poll(poll)) = self.recv().await? {
451 Ok(poll)
452 } else {
453 Err(GpsdJsonError::ProtocolError(
454 "Expected poll response from GPSD",
455 ))
456 }
457 }
458
459 /// Enables or disables data streaming mode
460 ///
461 /// # Arguments
462 /// * `enable` - true to start streaming, false to stop
463 pub async fn watch_mode(&mut self, enable: bool) -> Result<()> {
464 let (watch, _devices) = self
465 .set_watch(v3::types::Watch {
466 enable: Some(enable),
467 ..Default::default()
468 })
469 .await?;
470
471 assert_eq!(watch.enable, Some(enable));
472 Ok(())
473 }
474
475 /// Starts a data stream with the specified format and options
476 ///
477 /// This method consumes the client and returns a stream iterator
478 /// that yields GPS data in the requested format.
479 ///
480 /// # Arguments
481 /// * `opts` - Stream configuration options
482 ///
483 /// # Example
484 /// ```no_run
485 /// # use gpsd_json::client::{GpsdClient, StreamOptions};
486 /// # use futures::StreamExt;
487 /// # async fn example(client: GpsdClient<impl futures_io::AsyncRead + futures_io::AsyncWrite + Unpin>) -> Result<(), Box<dyn std::error::Error>> {
488 /// let mut stream = client.stream(StreamOptions::json()).await?;
489 /// while let Some(msg) = stream.next().await {
490 /// println!("GPS data: {:?}", msg?);
491 /// }
492 /// # Ok(())
493 /// # }
494 /// ```
495 pub async fn stream<Format: StreamFormat>(
496 mut self,
497 opts: StreamOptions<Format>,
498 ) -> Result<GpsdDataStream<Stream, v3::V3, Format>> {
499 let (watch, _devices) = self.set_watch(opts.inner).await?;
500 assert_eq!(watch.enable, Some(true));
501
502 Ok(GpsdDataStream {
503 inner: self,
504 _format: std::marker::PhantomData,
505 })
506 }
507
508 /// Configures watch mode settings
509 ///
510 /// Internal method to set watch parameters and receive confirmation.
511 async fn set_watch(
512 &mut self,
513 watch: v3::types::Watch,
514 ) -> Result<(v3::types::Watch, v3::response::DeviceList)> {
515 self.send(&v3::RequestMessage::Watch(Some(watch))).await?;
516 let Some(v3::ResponseMessage::Devices(devices)) = self.recv().await? else {
517 return Err(GpsdJsonError::ProtocolError(
518 "Expected devices response from GPSD",
519 ));
520 };
521 let Some(v3::ResponseMessage::Watch(watch)) = self.recv().await? else {
522 return Err(GpsdJsonError::ProtocolError(
523 "Expected watch response from GPSD",
524 ));
525 };
526
527 Ok((watch, devices))
528 }
529}
530
531/// Async stream for receiving GPS data from GPSD
532///
533/// This struct provides an async stream interface (implements `futures::Stream`)
534/// for receiving continuous GPS data from a GPSD server. The format of the data
535/// depends on the stream format type parameter.
536///
537/// The stream continues until explicitly closed or an error occurs.
538///
539/// # Example
540/// ```no_run
541/// # use gpsd_json::client::{GpsdClient, StreamOptions};
542/// # use futures::StreamExt;
543/// # async fn example(client: GpsdClient<impl futures_io::AsyncRead + futures_io::AsyncWrite + Unpin>) -> Result<(), Box<dyn std::error::Error>> {
544/// let mut stream = client.stream(StreamOptions::json()).await?;
545/// while let Some(result) = stream.next().await {
546/// match result {
547/// Ok(data) => println!("Received: {:?}", data),
548/// Err(e) => eprintln!("Error: {}", e),
549/// }
550/// }
551/// # Ok(())
552/// # }
553/// ```
554pub struct GpsdDataStream<Stream, Proto, Format>
555where
556 Proto: GpsdJsonProtocol,
557 Format: StreamFormat,
558{
559 inner: GpsdClientCore<Stream, Proto>,
560 _format: std::marker::PhantomData<Format>,
561}
562
563impl<Stream, Format> GpsdDataStream<Stream, v3::V3, Format>
564where
565 Stream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
566 Format: StreamFormat,
567{
568 /// Closes the data stream and returns the client
569 ///
570 /// This method stops the GPS data stream and returns the underlying
571 /// client for further operations.
572 pub async fn close(mut self) -> Result<GpsdClient<Stream>> {
573 let watch = v3::types::Watch::default();
574 self.inner
575 .send(&v3::RequestMessage::Watch(Some(watch)))
576 .await?;
577
578 loop {
579 match self.inner.recv().await {
580 Ok(Some(v3::ResponseMessage::Watch(watch))) => {
581 assert_eq!(watch.enable, Some(false));
582 break;
583 }
584 Ok(Some(_)) | Err(GpsdJsonError::SerdeError(_)) => continue,
585 Err(e) => return Err(e),
586 Ok(None) => {
587 return Err(GpsdJsonError::ProtocolError(
588 "Stream ended unexpectedly while closing",
589 ));
590 }
591 }
592 }
593
594 self.inner.buf.clear();
595 Ok(self.inner)
596 }
597}
598
599impl<Stream, Proto> futures_util::Stream for GpsdDataStream<Stream, Proto, Json>
600where
601 Stream: futures_io::AsyncRead + Unpin,
602 Proto: GpsdJsonProtocol + Unpin,
603{
604 type Item = Result<Proto::Response>;
605
606 fn poll_next(
607 self: std::pin::Pin<&mut Self>,
608 cx: &mut std::task::Context<'_>,
609 ) -> std::task::Poll<Option<Self::Item>> {
610 let this = self.get_mut();
611 let reader = std::pin::Pin::new(&mut this.inner.reader);
612
613 match reader.poll_response::<Proto::Response>(cx, &mut this.inner.buf) {
614 std::task::Poll::Ready(Ok(Some(msg))) => std::task::Poll::Ready(Some(Ok(msg))),
615 std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(None),
616 std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
617 std::task::Poll::Pending => std::task::Poll::Pending,
618 }
619 }
620}
621
622impl<Stream, Proto> futures_util::Stream for GpsdDataStream<Stream, Proto, Nmea>
623where
624 Stream: futures_io::AsyncRead + Unpin,
625 Proto: GpsdJsonProtocol + Unpin,
626{
627 type Item = Result<String>;
628
629 fn poll_next(
630 self: std::pin::Pin<&mut Self>,
631 cx: &mut std::task::Context<'_>,
632 ) -> std::task::Poll<Option<Self::Item>> {
633 let this = self.get_mut();
634 let reader = std::pin::Pin::new(&mut this.inner.reader);
635
636 match reader.poll_raw(cx, &mut this.inner.buf) {
637 std::task::Poll::Ready(Ok(Some(line))) => {
638 let line_str = String::from_utf8_lossy(&line).trim_end().to_string();
639 std::task::Poll::Ready(Some(Ok(line_str)))
640 }
641 std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(None),
642 std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
643 std::task::Poll::Pending => std::task::Poll::Pending,
644 }
645 }
646}
647
648impl<Stream, Proto> futures_util::Stream for GpsdDataStream<Stream, Proto, Raw>
649where
650 Stream: futures_io::AsyncRead + Unpin,
651 Proto: GpsdJsonProtocol + Unpin,
652{
653 type Item = Result<Vec<u8>>;
654
655 fn poll_next(
656 self: std::pin::Pin<&mut Self>,
657 cx: &mut std::task::Context<'_>,
658 ) -> std::task::Poll<Option<Self::Item>> {
659 let this = self.get_mut();
660 let reader = std::pin::Pin::new(&mut this.inner.reader);
661
662 match reader.poll_raw(cx, &mut this.inner.buf) {
663 std::task::Poll::Ready(Ok(Some(line))) => std::task::Poll::Ready(Some(Ok(line))),
664 std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(None),
665 std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
666 std::task::Poll::Pending => std::task::Poll::Pending,
667 }
668 }
669}