1use core::fmt::Debug;
2use std::sync::mpsc::Receiver;
3use std::sync::mpsc::SyncSender;
4use std::thread;
5use std::thread::JoinHandle;
6
7use geo::CoordFloat;
8use geo_types::Coord;
9
10use crate::projection::projector_common::ChannelStatus;
11use crate::projection::projector_common::Message;
12use crate::stream::Connectable;
13use crate::stream::Connected;
14use crate::stream::Stream;
15use crate::stream::StreamMT;
16use crate::stream::Unconnected;
17
18#[derive(Clone, Debug)]
21pub struct Identity<STATE> {
22 state: STATE,
23}
24
25impl Default for Identity<Unconnected> {
28 #[inline]
29 fn default() -> Self {
30 Self { state: Unconnected }
31 }
32}
33
34impl Connectable for Identity<Unconnected> {
35 type Output<SC> = Identity<Connected<SC>>;
37
38 #[inline]
39 fn connect<SC>(&self, sink: SC) -> Self::Output<SC> {
40 Identity {
41 state: Connected { sink },
42 }
43 }
44}
45
46impl<EP, SINK, T> Stream for Identity<Connected<SINK>>
47where
48 SINK: Stream<EP = EP, T = T>,
49 T: CoordFloat,
50{
51 type EP = EP;
52 type T = T;
53
54 #[inline]
55 fn endpoint(&mut self) -> &mut Self::EP {
56 self.state.sink.endpoint()
57 }
58
59 #[inline]
60 fn line_end(&mut self) {
61 self.state.sink.line_end();
62 }
63
64 #[inline]
65 fn line_start(&mut self) {
66 self.state.sink.line_start();
67 }
68
69 #[inline]
70 fn point(&mut self, p: &Coord<Self::T>, m: Option<u8>) {
71 self.state.sink.point(p, m);
72 }
73
74 #[inline]
75 fn polygon_end(&mut self) {
76 self.state.sink.polygon_end();
77 }
78
79 #[inline]
80 fn polygon_start(&mut self) {
81 self.state.sink.polygon_start();
82 }
83
84 #[inline]
85 fn sphere(&mut self) {
86 self.state.sink.sphere();
87 }
88}
89
90impl<T> StreamMT<T> for Identity<Unconnected>
91where
92 T: 'static + CoordFloat + Send,
93{
94 fn gen_stage(
99 self,
100 tx: SyncSender<Message<T>>,
101 rx: Receiver<Message<T>>,
102 ) -> JoinHandle<ChannelStatus<T>> {
103 thread::spawn(move || {
105 let a;
108 'message_loop: loop {
109 a = match rx.recv() {
110 Ok(message) => {
111 let res_tx = match message {
112 Message::Point(_)
113 | Message::EndPoint(_)
114 | Message::LineEnd
115 | Message::LineStart
116 | Message::PolygonStart
117 | Message::PolygonEnd
118 | Message::Sphere => tx.send(message),
119 Message::ShutDown
120 | Message::ShutDownWithReturn(_) => {
121 if let Err(e) = tx.send(Message::ShutDown) {
122 return ChannelStatus::Tx(e);
123 }
124 return ChannelStatus::ShuntDownReceived;
125 }
126 };
127 match res_tx {
128 Ok(()) => {
129 continue 'message_loop;
130 }
131 Err(e) => ChannelStatus::Tx(e),
132 }
133 }
134 Err(e) => ChannelStatus::Rx(e),
135 };
136
137 break;
138 }
139 a
140 })
141 }
142}