postgres_es2/queries/
generic_query_repository.rs

1use std::marker::PhantomData;
2
3use postgres::Client;
4
5use cqrs_es2::{
6    Aggregate,
7    AggregateError,
8    EventEnvelope,
9    Query,
10    QueryProcessor,
11};
12
13use super::query_context::QueryContext;
14
15/// This provides a simple query repository that can be used both to
16/// return deserialized views and to act as a query processor.
17pub struct GenericQueryRepository<V, A>
18where
19    V: Query<A>,
20    A: Aggregate, {
21    conn: Client,
22    query_name: String,
23    error_handler: Option<Box<ErrorHandler>>,
24    _phantom: PhantomData<(V, A)>,
25}
26
27type ErrorHandler = dyn Fn(AggregateError);
28
29impl<V, A> GenericQueryRepository<V, A>
30where
31    V: Query<A>,
32    A: Aggregate,
33{
34    /// Creates a new `GenericQueryRepository` that will store its'
35    /// views in the table named identically to the `query_name`
36    /// value provided. This table should be created by the user
37    /// previously (see `/db/init.sql`).
38    #[must_use]
39    pub fn new(
40        query_name: &str,
41        conn: Client,
42    ) -> Self {
43        GenericQueryRepository {
44            conn,
45            query_name: query_name.to_string(),
46            error_handler: None,
47            _phantom: PhantomData,
48        }
49    }
50    /// Since inbound views cannot
51    pub fn with_error_handler(
52        &mut self,
53        error_handler: Box<ErrorHandler>,
54    ) {
55        self.error_handler = Some(error_handler);
56    }
57
58    /// Returns the originally configured view name.
59    #[must_use]
60    pub fn view_name(&self) -> String {
61        self.query_name.to_string()
62    }
63
64    fn load_mut(
65        &mut self,
66        query_instance_id: String,
67    ) -> Result<(V, QueryContext<V>), AggregateError> {
68        let query = format!(
69            "SELECT version,payload FROM {} WHERE \
70             query_instance_id= $1",
71            &self.query_name
72        );
73        let result = match self
74            .conn
75            .query(query.as_str(), &[&query_instance_id])
76        {
77            Ok(result) => result,
78            Err(e) => {
79                return Err(AggregateError::new(
80                    e.to_string().as_str(),
81                ));
82            },
83        };
84        match result.iter().next() {
85            Some(row) => {
86                let view_name = self.query_name.clone();
87                let version = row.get("version");
88                let payload = row.get("payload");
89                let view = serde_json::from_value(payload)?;
90                let view_context = QueryContext::new(
91                    view_name,
92                    query_instance_id,
93                    version,
94                    PhantomData,
95                );
96                Ok((view, view_context))
97            },
98            None => {
99                let view_context = QueryContext::new(
100                    self.query_name.clone(),
101                    query_instance_id,
102                    0,
103                    PhantomData,
104                );
105                Ok((Default::default(), view_context))
106            },
107        }
108    }
109
110    /// Used to apply committed events to a view.
111    pub fn apply_events(
112        &mut self,
113        query_instance_id: &str,
114        events: &[EventEnvelope<A>],
115    ) {
116        match self.load_mut(query_instance_id.to_string()) {
117            Ok((mut view, mut view_context)) => {
118                for event in events {
119                    view.update(event);
120                }
121                view_context.commit(&mut self.conn, view);
122            },
123            Err(e) => {
124                match &self.error_handler {
125                    None => {},
126                    Some(handler) => {
127                        (handler)(e);
128                    },
129                }
130            },
131        };
132    }
133
134    /// Loads and deserializes a view based on the view id.
135    pub fn load(
136        &mut self,
137        query_instance_id: String,
138    ) -> Option<V> {
139        let query = format!(
140            "SELECT version,payload FROM {} WHERE \
141             query_instance_id= $1",
142            &self.query_name
143        );
144        let result = match self
145            .conn
146            .query(query.as_str(), &[&query_instance_id])
147        {
148            Ok(result) => result,
149            Err(err) => {
150                panic!(
151                    "unable to load view '{}' with id: '{}', \
152                     encountered: {}",
153                    &query_instance_id, &self.query_name, err
154                );
155            },
156        };
157        match result.iter().next() {
158            Some(row) => {
159                let payload = row.get("payload");
160                match serde_json::from_value(payload) {
161                    Ok(view) => Some(view),
162                    Err(e) => {
163                        match &self.error_handler {
164                            None => {},
165                            Some(handler) => {
166                                (handler)(e.into());
167                            },
168                        }
169                        None
170                    },
171                }
172            },
173            None => None,
174        }
175    }
176}
177
178impl<Q, A> QueryProcessor<A> for GenericQueryRepository<Q, A>
179where
180    Q: Query<A>,
181    A: Aggregate,
182{
183    fn dispatch(
184        &mut self,
185        query_instance_id: &str,
186        events: &[EventEnvelope<A>],
187    ) {
188        self.apply_events(&query_instance_id.to_string(), &events);
189    }
190}