spring_stream/
extractor.rs

1pub use sea_streamer::Message;
2pub use sea_streamer::MessageHeader;
3pub use sea_streamer::SeaMessage;
4pub use sea_streamer::SeqNo;
5pub use sea_streamer::ShardId;
6pub use sea_streamer::SharedMessage;
7pub use sea_streamer::StreamKey;
8pub use sea_streamer::Timestamp;
9
10use spring::app::App;
11use spring::config::ConfigRegistry;
12use spring::config::Configurable;
13use spring::plugin::ComponentRegistry;
14use std::ops::Deref;
15use std::ops::DerefMut;
16
17pub trait FromMsg {
18    fn from_msg(msg: &SeaMessage, app: &App) -> Self;
19}
20
21impl FromMsg for StreamKey {
22    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
23        msg.stream_key()
24    }
25}
26
27impl FromMsg for SeqNo {
28    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
29        msg.sequence()
30    }
31}
32
33impl FromMsg for ShardId {
34    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
35        msg.shard_id()
36    }
37}
38
39impl FromMsg for Timestamp {
40    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
41        msg.timestamp()
42    }
43}
44
45impl FromMsg for MessageHeader {
46    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
47        MessageHeader::new(
48            msg.stream_key(),
49            msg.shard_id(),
50            msg.sequence(),
51            msg.timestamp(),
52        )
53    }
54}
55
56impl FromMsg for SharedMessage {
57    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
58        msg.to_owned()
59    }
60}
61
62pub struct Component<T>(pub T);
63
64impl<T> FromMsg for Component<T>
65where
66    T: Clone + Send + Sync + 'static,
67{
68    fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
69        match app.get_component_ref::<T>() {
70            Some(component) => Component(T::clone(&component)),
71            None => panic!(
72                "There is no component of `{}` type",
73                std::any::type_name::<T>()
74            ),
75        }
76    }
77}
78
79impl<T> Deref for Component<T> {
80    type Target = T;
81
82    fn deref(&self) -> &Self::Target {
83        &self.0
84    }
85}
86
87impl<T> DerefMut for Component<T> {
88    fn deref_mut(&mut self) -> &mut Self::Target {
89        &mut self.0
90    }
91}
92
93#[cfg(feature = "json")]
94pub struct Json<T>(pub T);
95
96#[cfg(feature = "json")]
97impl<T> FromMsg for Json<T>
98where
99    T: serde::de::DeserializeOwned,
100{
101    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
102        let value = msg
103            .message()
104            .deserialize_json()
105            .expect("stream message parse as json failed");
106        Json(value)
107    }
108}
109
110pub struct Config<T>(pub T)
111where
112    T: serde::de::DeserializeOwned + Configurable;
113
114impl<T> FromMsg for Config<T>
115where
116    T: serde::de::DeserializeOwned + Configurable,
117{
118    fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
119        match app.get_config::<T>() {
120            Ok(config) => Config(config),
121            Err(e) => panic!(
122                "get config failed for typeof {}: {}",
123                std::any::type_name::<T>(),
124                e
125            ),
126        }
127    }
128}
129
130impl<T> Deref for Config<T>
131where
132    T: serde::de::DeserializeOwned + Configurable,
133{
134    type Target = T;
135
136    fn deref(&self) -> &Self::Target {
137        &self.0
138    }
139}