Skip to main content

summer_stream/
extractor.rs

1pub use sea_streamer::Message;
2pub use sea_streamer::MessageHeader;
3pub use sea_streamer::SeaMessage;
4pub use sea_streamer::SeqNo;
5pub use sea_streamer::ShardId;
6pub use sea_streamer::SharedMessage;
7pub use sea_streamer::StreamKey;
8pub use sea_streamer::Timestamp;
9
10use summer::app::App;
11use summer::config::ConfigRegistry;
12use summer::config::Configurable;
13use summer::extractor::Component;
14use summer::extractor::Config;
15use summer::plugin::ComponentRegistry;
16
17pub trait FromMsg {
18    fn from_msg(msg: &SeaMessage, app: &App) -> Self;
19}
20
21impl FromMsg for StreamKey {
22    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
23        msg.stream_key()
24    }
25}
26
27impl FromMsg for SeqNo {
28    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
29        msg.sequence()
30    }
31}
32
33impl FromMsg for ShardId {
34    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
35        msg.shard_id()
36    }
37}
38
39impl FromMsg for Timestamp {
40    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
41        msg.timestamp()
42    }
43}
44
45impl FromMsg for MessageHeader {
46    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
47        MessageHeader::new(
48            msg.stream_key(),
49            msg.shard_id(),
50            msg.sequence(),
51            msg.timestamp(),
52        )
53    }
54}
55
56impl FromMsg for SharedMessage {
57    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
58        msg.to_owned()
59    }
60}
61
62impl<T> FromMsg for Component<T>
63where
64    T: Clone + Send + Sync + 'static,
65{
66    fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
67        match app.get_component_ref::<T>() {
68            Some(component) => Component(T::clone(&component)),
69            None => panic!(
70                "There is no component of `{}` type",
71                std::any::type_name::<T>()
72            ),
73        }
74    }
75}
76
77impl<T> FromMsg for Config<T>
78where
79    T: serde::de::DeserializeOwned + Configurable,
80{
81    fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
82        match app.get_config::<T>() {
83            Ok(config) => Config(config),
84            Err(e) => panic!(
85                "get config failed for typeof {}: {}",
86                std::any::type_name::<T>(),
87                e
88            ),
89        }
90    }
91}
92
93#[cfg(feature = "json")]
94pub struct Json<T>(pub T);
95
96#[cfg(feature = "json")]
97impl<T> FromMsg for Json<T>
98where
99    T: serde::de::DeserializeOwned,
100{
101    fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
102        let value = msg
103            .message()
104            .deserialize_json()
105            .expect("stream message parse as json failed");
106        Json(value)
107    }
108}