d3_geo_rs/
identity.rs

1use core::fmt::Debug;
2use std::sync::mpsc::Receiver;
3use std::sync::mpsc::SyncSender;
4use std::thread;
5use std::thread::JoinHandle;
6
7use geo::CoordFloat;
8use geo_types::Coord;
9
10use crate::projection::projector_common::ChannelStatus;
11use crate::projection::projector_common::Message;
12use crate::stream::Connectable;
13use crate::stream::Connected;
14use crate::stream::Stream;
15use crate::stream::StreamMT;
16use crate::stream::Unconnected;
17
18/// Identity is a stream pipe line stage.
19/// that acts as a pass through node.
20#[derive(Clone, Debug)]
21pub struct Identity<STATE> {
22    state: STATE,
23}
24
25/// Not auto deriving here - it does not makes sense to provide
26/// a default for the connected state.
27impl Default for Identity<Unconnected> {
28    #[inline]
29    fn default() -> Self {
30        Self { state: Unconnected }
31    }
32}
33
34impl Connectable for Identity<Unconnected> {
35    /// The resultant builder type.
36    type Output<SC> = Identity<Connected<SC>>;
37
38    #[inline]
39    fn connect<SC>(&self, sink: SC) -> Self::Output<SC> {
40        Identity {
41            state: Connected { sink },
42        }
43    }
44}
45
46impl<EP, SINK, T> Stream for Identity<Connected<SINK>>
47where
48    SINK: Stream<EP = EP, T = T>,
49    T: CoordFloat,
50{
51    type EP = EP;
52    type T = T;
53
54    #[inline]
55    fn endpoint(&mut self) -> &mut Self::EP {
56        self.state.sink.endpoint()
57    }
58
59    #[inline]
60    fn line_end(&mut self) {
61        self.state.sink.line_end();
62    }
63
64    #[inline]
65    fn line_start(&mut self) {
66        self.state.sink.line_start();
67    }
68
69    #[inline]
70    fn point(&mut self, p: &Coord<Self::T>, m: Option<u8>) {
71        self.state.sink.point(p, m);
72    }
73
74    #[inline]
75    fn polygon_end(&mut self) {
76        self.state.sink.polygon_end();
77    }
78
79    #[inline]
80    fn polygon_start(&mut self) {
81        self.state.sink.polygon_start();
82    }
83
84    #[inline]
85    fn sphere(&mut self) {
86        self.state.sink.sphere();
87    }
88}
89
90impl<T> StreamMT<T> for Identity<Unconnected>
91where
92    T: 'static + CoordFloat + Send,
93{
94    /// Generate a thread which stage on the responsibility of the
95    /// `StreamTransformRadians` pipeline stage.
96    ///
97    /// Consumes a Self
98    fn gen_stage(
99        self,
100        tx: SyncSender<Message<T>>,
101        rx: Receiver<Message<T>>,
102    ) -> JoinHandle<ChannelStatus<T>> {
103        // Stage pipelines.
104        thread::spawn(move || {
105            // The thread takes ownership over `thread_tx`
106            // Each thread queues a message in the channel
107            let a;
108            'message_loop: loop {
109                a = match rx.recv() {
110                    Ok(message) => {
111                        let res_tx = match message {
112                            Message::Point(_)
113                            | Message::EndPoint(_)
114                            | Message::LineEnd
115                            | Message::LineStart
116                            | Message::PolygonStart
117                            | Message::PolygonEnd
118                            | Message::Sphere => tx.send(message),
119                            Message::ShutDown
120                            | Message::ShutDownWithReturn(_) => {
121                                if let Err(e) = tx.send(Message::ShutDown) {
122                                    return ChannelStatus::Tx(e);
123                                }
124                                return ChannelStatus::ShuntDownReceived;
125                            }
126                        };
127                        match res_tx {
128                            Ok(()) => {
129                                continue 'message_loop;
130                            }
131                            Err(e) => ChannelStatus::Tx(e),
132                        }
133                    }
134                    Err(e) => ChannelStatus::Rx(e),
135                };
136
137                break;
138            }
139            a
140        })
141    }
142}