summer_stream/
extractor.rs1pub use sea_streamer::Message;
2pub use sea_streamer::MessageHeader;
3pub use sea_streamer::SeaMessage;
4pub use sea_streamer::SeqNo;
5pub use sea_streamer::ShardId;
6pub use sea_streamer::SharedMessage;
7pub use sea_streamer::StreamKey;
8pub use sea_streamer::Timestamp;
9
10use summer::app::App;
11use summer::config::ConfigRegistry;
12use summer::config::Configurable;
13use summer::extractor::Component;
14use summer::extractor::Config;
15use summer::plugin::ComponentRegistry;
16
17pub trait FromMsg {
18 fn from_msg(msg: &SeaMessage, app: &App) -> Self;
19}
20
21impl FromMsg for StreamKey {
22 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
23 msg.stream_key()
24 }
25}
26
27impl FromMsg for SeqNo {
28 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
29 msg.sequence()
30 }
31}
32
33impl FromMsg for ShardId {
34 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
35 msg.shard_id()
36 }
37}
38
39impl FromMsg for Timestamp {
40 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
41 msg.timestamp()
42 }
43}
44
45impl FromMsg for MessageHeader {
46 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
47 MessageHeader::new(
48 msg.stream_key(),
49 msg.shard_id(),
50 msg.sequence(),
51 msg.timestamp(),
52 )
53 }
54}
55
56impl FromMsg for SharedMessage {
57 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
58 msg.to_owned()
59 }
60}
61
62impl<T> FromMsg for Component<T>
63where
64 T: Clone + Send + Sync + 'static,
65{
66 fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
67 match app.get_component_ref::<T>() {
68 Some(component) => Component(T::clone(&component)),
69 None => panic!(
70 "There is no component of `{}` type",
71 std::any::type_name::<T>()
72 ),
73 }
74 }
75}
76
77impl<T> FromMsg for Config<T>
78where
79 T: serde::de::DeserializeOwned + Configurable,
80{
81 fn from_msg(_msg: &SeaMessage, app: &App) -> Self {
82 match app.get_config::<T>() {
83 Ok(config) => Config(config),
84 Err(e) => panic!(
85 "get config failed for typeof {}: {}",
86 std::any::type_name::<T>(),
87 e
88 ),
89 }
90 }
91}
92
93#[cfg(feature = "json")]
94pub struct Json<T>(pub T);
95
96#[cfg(feature = "json")]
97impl<T> FromMsg for Json<T>
98where
99 T: serde::de::DeserializeOwned,
100{
101 fn from_msg(msg: &SeaMessage, _app: &App) -> Self {
102 let value = msg
103 .message()
104 .deserialize_json()
105 .expect("stream message parse as json failed");
106 Json(value)
107 }
108}