postgres_es2/queries/
generic_query_repository.rs1use std::marker::PhantomData;
2
3use postgres::Client;
4
5use cqrs_es2::{
6 Aggregate,
7 AggregateError,
8 EventEnvelope,
9 Query,
10 QueryProcessor,
11};
12
13use super::query_context::QueryContext;
14
15pub struct GenericQueryRepository<V, A>
18where
19 V: Query<A>,
20 A: Aggregate, {
21 conn: Client,
22 query_name: String,
23 error_handler: Option<Box<ErrorHandler>>,
24 _phantom: PhantomData<(V, A)>,
25}
26
27type ErrorHandler = dyn Fn(AggregateError);
28
29impl<V, A> GenericQueryRepository<V, A>
30where
31 V: Query<A>,
32 A: Aggregate,
33{
34 #[must_use]
39 pub fn new(
40 query_name: &str,
41 conn: Client,
42 ) -> Self {
43 GenericQueryRepository {
44 conn,
45 query_name: query_name.to_string(),
46 error_handler: None,
47 _phantom: PhantomData,
48 }
49 }
50 pub fn with_error_handler(
52 &mut self,
53 error_handler: Box<ErrorHandler>,
54 ) {
55 self.error_handler = Some(error_handler);
56 }
57
58 #[must_use]
60 pub fn view_name(&self) -> String {
61 self.query_name.to_string()
62 }
63
64 fn load_mut(
65 &mut self,
66 query_instance_id: String,
67 ) -> Result<(V, QueryContext<V>), AggregateError> {
68 let query = format!(
69 "SELECT version,payload FROM {} WHERE \
70 query_instance_id= $1",
71 &self.query_name
72 );
73 let result = match self
74 .conn
75 .query(query.as_str(), &[&query_instance_id])
76 {
77 Ok(result) => result,
78 Err(e) => {
79 return Err(AggregateError::new(
80 e.to_string().as_str(),
81 ));
82 },
83 };
84 match result.iter().next() {
85 Some(row) => {
86 let view_name = self.query_name.clone();
87 let version = row.get("version");
88 let payload = row.get("payload");
89 let view = serde_json::from_value(payload)?;
90 let view_context = QueryContext::new(
91 view_name,
92 query_instance_id,
93 version,
94 PhantomData,
95 );
96 Ok((view, view_context))
97 },
98 None => {
99 let view_context = QueryContext::new(
100 self.query_name.clone(),
101 query_instance_id,
102 0,
103 PhantomData,
104 );
105 Ok((Default::default(), view_context))
106 },
107 }
108 }
109
110 pub fn apply_events(
112 &mut self,
113 query_instance_id: &str,
114 events: &[EventEnvelope<A>],
115 ) {
116 match self.load_mut(query_instance_id.to_string()) {
117 Ok((mut view, mut view_context)) => {
118 for event in events {
119 view.update(event);
120 }
121 view_context.commit(&mut self.conn, view);
122 },
123 Err(e) => {
124 match &self.error_handler {
125 None => {},
126 Some(handler) => {
127 (handler)(e);
128 },
129 }
130 },
131 };
132 }
133
134 pub fn load(
136 &mut self,
137 query_instance_id: String,
138 ) -> Option<V> {
139 let query = format!(
140 "SELECT version,payload FROM {} WHERE \
141 query_instance_id= $1",
142 &self.query_name
143 );
144 let result = match self
145 .conn
146 .query(query.as_str(), &[&query_instance_id])
147 {
148 Ok(result) => result,
149 Err(err) => {
150 panic!(
151 "unable to load view '{}' with id: '{}', \
152 encountered: {}",
153 &query_instance_id, &self.query_name, err
154 );
155 },
156 };
157 match result.iter().next() {
158 Some(row) => {
159 let payload = row.get("payload");
160 match serde_json::from_value(payload) {
161 Ok(view) => Some(view),
162 Err(e) => {
163 match &self.error_handler {
164 None => {},
165 Some(handler) => {
166 (handler)(e.into());
167 },
168 }
169 None
170 },
171 }
172 },
173 None => None,
174 }
175 }
176}
177
178impl<Q, A> QueryProcessor<A> for GenericQueryRepository<Q, A>
179where
180 Q: Query<A>,
181 A: Aggregate,
182{
183 fn dispatch(
184 &mut self,
185 query_instance_id: &str,
186 events: &[EventEnvelope<A>],
187 ) {
188 self.apply_events(&query_instance_id.to_string(), &events);
189 }
190}