1use std::io::BufRead;
8use std::net::{TcpStream, ToSocketAddrs};
9
10use crate::client::{Json, Nmea, Raw, StreamFormat, StreamOptions};
11use crate::error::GpsdJsonError;
12use crate::protocol::{GpsdJsonDecode, GpsdJsonEncode, v3};
13use crate::{Result, client::GpsdJsonProtocol};
14
15#[derive(Debug)]
25pub struct GpsdClientCore<Stream, Proto> {
26 reader: std::io::BufReader<Stream>,
27 buf: Vec<u8>,
28 _proto: std::marker::PhantomData<Proto>,
29}
30
31impl<Stream, Proto> GpsdClientCore<Stream, Proto>
32where
33 Proto: GpsdJsonProtocol,
34{
35 pub fn open(stream: Stream) -> Result<Self>
47 where
48 Stream: std::io::Read + std::io::Write,
49 {
50 let reader = std::io::BufReader::new(stream);
51 let mut client = GpsdClientCore {
52 reader,
53 buf: Vec::new(),
54 _proto: std::marker::PhantomData,
55 };
56
57 client.ensure_version()?;
58 Ok(client)
59 }
60
61 fn send(&mut self, msg: &Proto::Request) -> Result<()>
63 where
64 Stream: std::io::Write,
65 {
66 self.reader.get_mut().write_request(msg)
67 }
68
69 fn recv(&mut self) -> Result<Option<Proto::Response>>
73 where
74 Stream: std::io::Read,
75 {
76 self.buf.clear();
77 match self.reader.read_response(&mut self.buf)? {
78 Some(resp) => Ok(Some(resp)),
79 None => Ok(None), }
81 }
82
83 fn ensure_version(&mut self) -> Result<()>
89 where
90 Stream: std::io::Read,
91 {
92 self.buf.clear();
93 if let Ok(Some(v3::ResponseMessage::Version(version))) =
94 self.reader.read_response(&mut self.buf)
95 {
96 if Proto::API_VERSION_MAJOR != version.proto_major
97 || Proto::API_VERSION_MINOR < version.proto_minor
98 {
99 Err(GpsdJsonError::UnsupportedProtocolVersion((
100 version.proto_major,
101 version.proto_minor,
102 )))
103 } else {
104 Ok(())
105 }
106 } else {
107 Err(GpsdJsonError::ProtocolError(
108 "Failed to read version message from GPSD",
109 ))
110 }
111 }
112}
113
114impl<Proto> GpsdClientCore<TcpStream, Proto>
115where
116 Proto: GpsdJsonProtocol,
117{
118 pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
136 let stream = TcpStream::connect(addr).map_err(GpsdJsonError::IoError)?;
137 Self::open(stream)
138 }
139}
140
141impl<Proto> TryFrom<TcpStream> for GpsdClientCore<TcpStream, Proto>
142where
143 Proto: GpsdJsonProtocol,
144{
145 type Error = GpsdJsonError;
146
147 fn try_from(stream: TcpStream) -> Result<Self> {
148 Self::open(stream)
149 }
150}
151
152#[cfg(feature = "proto-v3")]
157pub type GpsdClient<Stream> = GpsdClientCore<Stream, v3::V3>;
158
159impl<Stream> GpsdClient<Stream>
160where
161 Stream: std::io::Read + std::io::Write,
162{
163 pub fn version(&mut self) -> Result<v3::response::Version> {
168 self.send(&v3::RequestMessage::Version)?;
169 if let Some(v3::ResponseMessage::Version(version)) = self.recv()? {
170 Ok(version)
171 } else {
172 Err(GpsdJsonError::ProtocolError(
173 "Expected version response from GPSD",
174 ))
175 }
176 }
177
178 pub fn devices(&mut self) -> Result<v3::response::DeviceList> {
183 self.send(&v3::RequestMessage::Devices)?;
184 if let Some(v3::ResponseMessage::Devices(devices)) = self.recv()? {
185 Ok(devices)
186 } else {
187 Err(GpsdJsonError::ProtocolError(
188 "Expected devices response from GPSD",
189 ))
190 }
191 }
192
193 pub fn device(&mut self) -> Result<v3::types::Device> {
198 self.send(&v3::RequestMessage::Device(None))?;
199 if let Some(v3::ResponseMessage::Device(device)) = self.recv()? {
200 Ok(device)
201 } else {
202 Err(GpsdJsonError::ProtocolError(
203 "Expected device response from GPSD",
204 ))
205 }
206 }
207
208 pub fn watch(&mut self) -> Result<(v3::types::Watch, v3::response::DeviceList)> {
213 self.send(&v3::RequestMessage::Watch(None))?;
214 let Some(v3::ResponseMessage::Devices(devices)) = self.recv()? else {
215 return Err(GpsdJsonError::ProtocolError(
216 "Expected devices response from GPSD",
217 ));
218 };
219 let Some(v3::ResponseMessage::Watch(watch)) = self.recv()? else {
220 return Err(GpsdJsonError::ProtocolError(
221 "Expected watch response from GPSD",
222 ));
223 };
224
225 Ok((watch, devices))
226 }
227
228 pub fn poll(&mut self) -> Result<v3::response::Poll> {
233 self.send(&v3::RequestMessage::Poll)?;
234 if let Some(v3::ResponseMessage::Poll(poll)) = self.recv()? {
235 Ok(poll)
236 } else {
237 Err(GpsdJsonError::ProtocolError(
238 "Expected poll response from GPSD",
239 ))
240 }
241 }
242
243 pub fn watch_mode(&mut self, enable: bool) -> Result<()> {
248 let (watch, _devices) = self.set_watch(v3::types::Watch {
249 enable: Some(enable),
250 ..Default::default()
251 })?;
252
253 assert_eq!(watch.enable, Some(enable));
254 Ok(())
255 }
256
257 pub fn stream<Format: StreamFormat>(
273 mut self,
274 opts: StreamOptions<Format>,
275 ) -> Result<GpsdDataStream<Stream, v3::V3, Format>> {
276 let (watch, _devices) = self.set_watch(opts.inner)?;
277 assert_eq!(watch.enable, Some(true));
278
279 Ok(GpsdDataStream {
280 inner: self,
281 _format: std::marker::PhantomData,
282 })
283 }
284
285 fn set_watch(
289 &mut self,
290 watch: v3::types::Watch,
291 ) -> Result<(v3::types::Watch, v3::response::DeviceList)> {
292 self.send(&v3::RequestMessage::Watch(Some(watch)))?;
293 let Some(v3::ResponseMessage::Devices(devices)) = self.recv()? else {
294 return Err(GpsdJsonError::ProtocolError(
295 "Expected devices response from GPSD",
296 ));
297 };
298 let Some(v3::ResponseMessage::Watch(watch)) = self.recv()? else {
299 return Err(GpsdJsonError::ProtocolError(
300 "Expected watch response from GPSD",
301 ));
302 };
303
304 Ok((watch, devices))
305 }
306}
307
308pub struct GpsdDataStream<Stream, Proto, Format>
316where
317 Proto: GpsdJsonProtocol,
318 Format: StreamFormat,
319{
320 inner: GpsdClientCore<Stream, Proto>,
321 _format: std::marker::PhantomData<Format>,
322}
323
324impl<Stream, Format> GpsdDataStream<Stream, v3::V3, Format>
325where
326 Stream: std::io::Read + std::io::Write,
327 Format: StreamFormat,
328{
329 pub fn close(mut self) -> Result<GpsdClient<Stream>> {
334 let watch = v3::types::Watch::default();
335
336 let (watch, _devices) = self.inner.set_watch(watch)?;
337 assert_eq!(watch.enable, Some(false));
338
339 Ok(self.inner)
340 }
341}
342
343impl<Stream, Proto> Iterator for GpsdDataStream<Stream, Proto, Json>
344where
345 Stream: std::io::Read,
346 Proto: GpsdJsonProtocol,
347{
348 type Item = Result<Proto::Response>;
349
350 fn next(&mut self) -> Option<Self::Item> {
351 self.inner.recv().transpose()
352 }
353}
354
355impl<Stream, Proto> Iterator for GpsdDataStream<Stream, Proto, Nmea>
356where
357 Stream: std::io::Read,
358 Proto: GpsdJsonProtocol,
359{
360 type Item = Result<String>;
361
362 fn next(&mut self) -> Option<Self::Item> {
363 self.inner.buf.clear();
364
365 match self.inner.reader.read_until(b'\n', &mut self.inner.buf) {
366 Ok(0) => None, Ok(_) => Some(Ok(String::from_utf8_lossy(&self.inner.buf)
368 .trim_end()
369 .to_string())),
370 Err(e) => Some(Err(GpsdJsonError::IoError(e))),
371 }
372 }
373}
374
375impl<Stream, Proto> Iterator for GpsdDataStream<Stream, Proto, Raw>
376where
377 Stream: std::io::Read,
378 Proto: GpsdJsonProtocol,
379{
380 type Item = Result<String>;
381
382 fn next(&mut self) -> Option<Self::Item> {
383 self.inner.buf.clear();
384
385 match self.inner.reader.read_until(b'\n', &mut self.inner.buf) {
386 Ok(0) => None, Ok(_) => Some(Ok(String::from_utf8_lossy(&self.inner.buf)
388 .trim_end()
389 .to_string())),
390 Err(e) => Some(Err(GpsdJsonError::IoError(e))),
391 }
392 }
393}