summer_pubsub/
extractor.rs1use crate::handler::PubSubEnvelope;
2use crate::message::Message;
3use summer::app::App;
4use summer::config::ConfigRegistry;
5use summer::config::Configurable;
6use summer::extractor::Component;
7use summer::extractor::Config;
8use summer::plugin::ComponentRegistry;
9
10pub trait FromPubSubMsg: Sized {
11 fn from_pubsub(
12 grpc: &google_cloud_pubsub::model::Message,
13 env: &PubSubEnvelope,
14 app: &App,
15 ) -> Self;
16}
17
18impl FromPubSubMsg for Message {
19 fn from_pubsub(
20 grpc: &google_cloud_pubsub::model::Message,
21 env: &PubSubEnvelope,
22 _app: &App,
23 ) -> Self {
24 Message::new(
25 grpc.message_id.clone(),
26 grpc.data.clone(),
27 grpc.attributes.clone(),
28 env.ack.clone(),
29 )
30 }
31}
32
33impl<T> FromPubSubMsg for Component<T>
34where
35 T: Clone + Send + Sync + 'static,
36{
37 fn from_pubsub(
38 _grpc: &google_cloud_pubsub::model::Message,
39 _env: &PubSubEnvelope,
40 app: &App,
41 ) -> Self {
42 match app.get_component_ref::<T>() {
43 Some(component) => Component(T::clone(&component)),
44 None => panic!(
45 "There is no component of `{}` type",
46 std::any::type_name::<T>()
47 ),
48 }
49 }
50}
51
52impl<T> FromPubSubMsg for Config<T>
53where
54 T: serde::de::DeserializeOwned + Configurable,
55{
56 fn from_pubsub(
57 _grpc: &google_cloud_pubsub::model::Message,
58 _env: &PubSubEnvelope,
59 app: &App,
60 ) -> Self {
61 match app.get_config::<T>() {
62 Ok(config) => Config(config),
63 Err(e) => panic!(
64 "get config failed for typeof {}: {}",
65 std::any::type_name::<T>(),
66 e
67 ),
68 }
69 }
70}