spring_stream/
extractor.rs1pub use sea_streamer::Message;
2pub use sea_streamer::MessageHeader;
3pub use sea_streamer::SeaMessage;
4pub use sea_streamer::SeqNo;
5pub use sea_streamer::ShardId;
6pub use sea_streamer::SharedMessage;
7pub use sea_streamer::StreamKey;
8pub use sea_streamer::Timestamp;
9
10use spring::app::App;
11use spring::config::ConfigRegistry;
12use spring::config::Configurable;
13use spring::plugin::ComponentRegistry;
14use std::ops::Deref;
15use std::ops::DerefMut;
16
17pub trait FromMsg {
18 fn from_msg(msg: &SeaMessage, app: &App) -> Self;
19}
20
21impl FromMsg for StreamKey {
22 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
23 msg.stream_key()
24 }
25}
26
27impl FromMsg for SeqNo {
28 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
29 msg.sequence()
30 }
31}
32
33impl FromMsg for ShardId {
34 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
35 msg.shard_id()
36 }
37}
38
39impl FromMsg for Timestamp {
40 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
41 msg.timestamp()
42 }
43}
44
45impl FromMsg for MessageHeader {
46 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
47 MessageHeader::new(
48 msg.stream_key(),
49 msg.shard_id(),
50 msg.sequence(),
51 msg.timestamp(),
52 )
53 }
54}
55
56impl FromMsg for SharedMessage {
57 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
58 msg.to_owned()
59 }
60}
61
62pub struct Component<T>(pub T);
63
64impl<T> FromMsg for Component<T>
65where
66 T: Clone + Send + Sync + 'static,
67{
68 fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
69 match app.get_component_ref::<T>() {
70 Some(component) => Component(T::clone(&component)),
71 None => panic!(
72 "There is no component of `{}` type",
73 std::any::type_name::<T>()
74 ),
75 }
76 }
77}
78
79impl<T> Deref for Component<T> {
80 type Target = T;
81
82 fn deref(&self) -> &Self::Target {
83 &self.0
84 }
85}
86
87impl<T> DerefMut for Component<T> {
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.0
90 }
91}
92
93#[cfg(feature = "json")]
94pub struct Json<T>(pub T);
95
96#[cfg(feature = "json")]
97impl<T> FromMsg for Json<T>
98where
99 T: serde::de::DeserializeOwned,
100{
101 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
102 let value = msg
103 .message()
104 .deserialize_json()
105 .expect("stream message parse as json failed");
106 Json(value)
107 }
108}
109
110pub struct Config<T>(pub T)
111where
112 T: serde::de::DeserializeOwned + Configurable;
113
114impl<T> FromMsg for Config<T>
115where
116 T: serde::de::DeserializeOwned + Configurable,
117{
118 fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
119 match app.get_config::<T>() {
120 Ok(config) => Config(config),
121 Err(e) => panic!(
122 "get config failed for typeof {}: {}",
123 std::any::type_name::<T>(),
124 e
125 ),
126 }
127 }
128}
129
130impl<T> Deref for Config<T>
131where
132 T: serde::de::DeserializeOwned + Configurable,
133{
134 type Target = T;
135
136 fn deref(&self) -> &Self::Target {
137 &self.0
138 }
139}