Skip to main content

summer_pubsub/
extractor.rs

1use crate::handler::PubSubEnvelope;
2use crate::message::Message;
3use summer::app::App;
4use summer::config::ConfigRegistry;
5use summer::config::Configurable;
6use summer::extractor::Component;
7use summer::extractor::Config;
8use summer::plugin::ComponentRegistry;
9
10pub trait FromPubSubMsg: Sized {
11    fn from_pubsub(
12        grpc: &google_cloud_pubsub::model::Message,
13        env: &PubSubEnvelope,
14        app: &App,
15    ) -> Self;
16}
17
18impl FromPubSubMsg for Message {
19    fn from_pubsub(
20        grpc: &google_cloud_pubsub::model::Message,
21        env: &PubSubEnvelope,
22        _app: &App,
23    ) -> Self {
24        Message::new(
25            grpc.message_id.clone(),
26            grpc.data.clone(),
27            grpc.attributes.clone(),
28            env.ack.clone(),
29        )
30    }
31}
32
33impl<T> FromPubSubMsg for Component<T>
34where
35    T: Clone + Send + Sync + 'static,
36{
37    fn from_pubsub(
38        _grpc: &google_cloud_pubsub::model::Message,
39        _env: &PubSubEnvelope,
40        app: &App,
41    ) -> Self {
42        match app.get_component_ref::<T>() {
43            Some(component) => Component(T::clone(&component)),
44            None => panic!(
45                "There is no component of `{}` type",
46                std::any::type_name::<T>()
47            ),
48        }
49    }
50}
51
52impl<T> FromPubSubMsg for Config<T>
53where
54    T: serde::de::DeserializeOwned + Configurable,
55{
56    fn from_pubsub(
57        _grpc: &google_cloud_pubsub::model::Message,
58        _env: &PubSubEnvelope,
59        app: &App,
60    ) -> Self {
61        match app.get_config::<T>() {
62            Ok(config) => Config(config),
63            Err(e) => panic!(
64                "get config failed for typeof {}: {}",
65                std::any::type_name::<T>(),
66                e
67            ),
68        }
69    }
70}