Skip to main content

kurrentdb/
projection_client.rs

1use crate::event_store::client::projections;
2use crate::grpc::{ClientSettings, GrpcClient};
3use crate::options::projections::{
4    CreateProjectionOptions, DeleteProjectionOptions, GenericProjectionOptions,
5    GetResultProjectionOptions, GetStateProjectionOptions, UpdateProjectionOptions,
6};
7use futures::{TryStreamExt, stream::BoxStream};
8use serde::de::DeserializeOwned;
9
10#[allow(dead_code)]
11#[derive(Clone, Debug)]
12pub(crate) enum StatsFor {
13    Name(String),
14    AllProjections,
15    AllContinuous,
16}
17
18#[derive(Clone, Debug)]
19pub struct ProjectionStatus {
20    pub core_processing_time: i64,
21    pub version: i64,
22    pub epoch: i64,
23    pub effective_name: String,
24    pub writes_in_progress: i32,
25    pub reads_in_progress: i32,
26    pub partitions_cached: i32,
27    pub status: String,
28    pub state_reason: String,
29    pub name: String,
30    pub mode: String,
31    pub position: String,
32    pub progress: f32,
33    pub last_checkpoint: String,
34    pub events_processed_after_restart: i64,
35    pub checkpoint_status: String,
36    pub buffered_events: i64,
37    pub write_pending_events_before_checkpoint: i32,
38    pub write_pending_events_after_checkpoint: i32,
39}
40
41#[derive(Clone)]
42pub struct ProjectionClient {
43    client: GrpcClient,
44}
45
46impl ProjectionClient {
47    pub fn new(settings: ClientSettings) -> eyre::Result<Self> {
48        ProjectionClient::with_runtime_handle(tokio::runtime::Handle::current(), settings)
49    }
50
51    pub fn with_runtime_handle(
52        handle: tokio::runtime::Handle,
53        settings: ClientSettings,
54    ) -> eyre::Result<Self> {
55        let client = GrpcClient::create(handle, settings)?;
56
57        Ok(ProjectionClient { client })
58    }
59
60    pub fn settings(&self) -> &ClientSettings {
61        self.client.connection_settings()
62    }
63
64    pub async fn create<Name>(
65        &self,
66        name: Name,
67        query: String,
68        options: &CreateProjectionOptions,
69    ) -> crate::Result<()>
70    where
71        Name: AsRef<str>,
72    {
73        self.create_projection_internal(
74            options,
75            projections::create_req::Options {
76                query: query.clone(),
77                engine_version: options.engine_version.as_i32(),
78                mode: Some(projections::create_req::options::Mode::Continuous(
79                    projections::create_req::options::Continuous {
80                        name: name.as_ref().to_string(),
81                        track_emitted_streams: options.track_emitted_streams,
82                    },
83                )),
84            },
85        )
86        .await?;
87
88        // TODO - create projection RPC call needs to be fixed upstream where the emit options
89        // will be added to the API. Right now, do an extra RPC call to implement it.
90        if options.emit {
91            let upd_options = UpdateProjectionOptions::default().emit(true);
92
93            self.update(name.as_ref(), query, &upd_options).await?;
94        }
95
96        Ok(())
97    }
98
99    async fn create_projection_internal<Opts>(
100        &self,
101        create_opts: &Opts,
102        options: projections::create_req::Options,
103    ) -> crate::Result<()>
104    where
105        Opts: crate::options::Options,
106    {
107        let req = projections::CreateReq {
108            options: Some(options),
109        };
110
111        let req = crate::commands::new_request(self.client.connection_settings(), create_opts, req);
112
113        self.client
114            .execute(|handle| async move {
115                let mut client = projections::projections_client::ProjectionsClient::with_origin(
116                    handle.client,
117                    handle.uri,
118                );
119                let _ = client.create(req).await?;
120
121                Ok(())
122            })
123            .await
124    }
125
126    pub async fn update<Name>(
127        &self,
128        name: Name,
129        query: String,
130        options: &UpdateProjectionOptions,
131    ) -> crate::Result<()>
132    where
133        Name: AsRef<str>,
134    {
135        let req_options = projections::update_req::Options {
136            name: name.as_ref().to_string(),
137            emit_option: options
138                .emit
139                .as_ref()
140                .copied()
141                .map(projections::update_req::options::EmitOption::EmitEnabled)
142                .or(Some(
143                    projections::update_req::options::EmitOption::NoEmitOptions(()),
144                )),
145            query,
146        };
147
148        let req = projections::UpdateReq {
149            options: Some(req_options),
150        };
151
152        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
153
154        self.client
155            .execute(|handle| async move {
156                let mut client = projections::projections_client::ProjectionsClient::with_origin(
157                    handle.client.clone(),
158                    handle.uri.clone(),
159                );
160
161                let _ = client.update(req).await?;
162
163                Ok(())
164            })
165            .await
166    }
167
168    pub async fn delete<Name>(
169        &self,
170        name: Name,
171        options: &DeleteProjectionOptions,
172    ) -> crate::Result<()>
173    where
174        Name: AsRef<str>,
175    {
176        let req_options = projections::delete_req::Options {
177            name: name.as_ref().to_string(),
178            delete_emitted_streams: options.delete_emitted_streams,
179            delete_state_stream: options.delete_state_stream,
180            delete_checkpoint_stream: options.delete_checkpoint_stream,
181        };
182
183        let req = projections::DeleteReq {
184            options: Some(req_options),
185        };
186
187        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
188
189        self.client
190            .execute(|handle| async move {
191                let mut client = projections::projections_client::ProjectionsClient::with_origin(
192                    handle.client.clone(),
193                    handle.uri.clone(),
194                );
195
196                let _ = client.delete(req).await?;
197
198                Ok(())
199            })
200            .await
201    }
202
203    pub async fn get_status<Name>(
204        &self,
205        name: Name,
206        options: &GenericProjectionOptions,
207    ) -> crate::Result<Option<ProjectionStatus>>
208    where
209        Name: AsRef<str>,
210    {
211        self.statistics(StatsFor::Name(name.as_ref().to_string()), options)
212            .await?
213            .try_next()
214            .await
215    }
216
217    pub async fn list(
218        &self,
219        options: &GenericProjectionOptions,
220    ) -> crate::Result<BoxStream<'_, crate::Result<ProjectionStatus>>> {
221        self.statistics(StatsFor::AllContinuous, options).await
222    }
223
224    async fn statistics(
225        &self,
226        stats_for: StatsFor,
227        options: &GenericProjectionOptions,
228    ) -> crate::Result<BoxStream<'_, crate::Result<ProjectionStatus>>> {
229        let mode = match stats_for {
230            StatsFor::Name(name) => projections::statistics_req::options::Mode::Name(name),
231            StatsFor::AllProjections => projections::statistics_req::options::Mode::All(()),
232            StatsFor::AllContinuous => projections::statistics_req::options::Mode::Continuous(()),
233        };
234
235        let stats_options = projections::statistics_req::Options { mode: Some(mode) };
236
237        let req = projections::StatisticsReq {
238            options: Some(stats_options),
239        };
240
241        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
242
243        self.client
244            .execute(|handle| async move {
245                let mut client =
246                    projections::projections_client::ProjectionsClient::with_origin(handle.client.clone(), handle.uri.clone());
247
248                let mut stream = client.statistics(req).await?.into_inner();
249
250                let stream = async_stream::stream! {
251                    loop {
252                        match stream.try_next().await {
253                            Err(e) => {
254                                let e = crate::Error::from_grpc(e);
255
256                                handle.report_error(&e);
257                                yield Err(e);
258                                break;
259                            }
260
261                            Ok(resp) => {
262                                if let Some(resp) = resp {
263                                    let details = resp.details.expect("to be defined");
264                                    let details = ProjectionStatus {
265                                        core_processing_time: details.core_processing_time,
266                                        version: details.version,
267                                        epoch: details.epoch,
268                                        effective_name: details.effective_name,
269                                        writes_in_progress: details.writes_in_progress,
270                                        reads_in_progress: details.reads_in_progress,
271                                        partitions_cached: details.partitions_cached,
272                                        status: details.status,
273                                        state_reason: details.state_reason,
274                                        name: details.name,
275                                        mode: details.mode,
276                                        position: details.position,
277                                        progress: details.progress,
278                                        last_checkpoint: details.last_checkpoint,
279                                        events_processed_after_restart: details.events_processed_after_restart,
280                                        checkpoint_status: details.checkpoint_status,
281                                        buffered_events: details.buffered_events,
282                                        write_pending_events_after_checkpoint: details.write_pending_events_after_checkpoint,
283                                        write_pending_events_before_checkpoint: details.write_pending_events_before_checkpoint,
284                                    };
285
286                                    yield Ok(details);
287                                    continue;
288                                }
289
290                                break;
291                            }
292                        }
293                    }
294                };
295
296                let stream: BoxStream<crate::Result<ProjectionStatus>> = Box::pin(stream);
297
298                Ok(stream)
299            })
300            .await
301    }
302
303    pub async fn enable<Name>(
304        &self,
305        name: Name,
306        options: &GenericProjectionOptions,
307    ) -> crate::Result<()>
308    where
309        Name: AsRef<str>,
310    {
311        let req_options = projections::enable_req::Options {
312            name: name.as_ref().to_string(),
313        };
314
315        let req = projections::EnableReq {
316            options: Some(req_options),
317        };
318
319        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
320
321        self.client
322            .execute(|handle| async move {
323                let mut client = projections::projections_client::ProjectionsClient::with_origin(
324                    handle.client.clone(),
325                    handle.uri.clone(),
326                );
327
328                let _ = client.enable(req).await?;
329
330                Ok(())
331            })
332            .await
333    }
334
335    pub async fn reset<Name>(
336        &self,
337        name: Name,
338        options: &GenericProjectionOptions,
339    ) -> crate::Result<()>
340    where
341        Name: AsRef<str>,
342    {
343        let req_options = projections::reset_req::Options {
344            name: name.as_ref().to_string(),
345            write_checkpoint: false,
346        };
347
348        let req = projections::ResetReq {
349            options: Some(req_options),
350        };
351
352        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
353
354        self.client
355            .execute(|handle| async move {
356                let mut client = projections::projections_client::ProjectionsClient::with_origin(
357                    handle.client.clone(),
358                    handle.uri.clone(),
359                );
360
361                let _ = client.reset(req).await?;
362
363                Ok(())
364            })
365            .await
366    }
367
368    pub async fn disable<Name>(
369        &self,
370        name: Name,
371        options: &GenericProjectionOptions,
372    ) -> crate::Result<()>
373    where
374        Name: AsRef<str>,
375    {
376        self.disable_projection_internal(name, true, options).await
377    }
378
379    pub async fn abort<Name>(
380        &self,
381        name: Name,
382        options: &GenericProjectionOptions,
383    ) -> crate::Result<()>
384    where
385        Name: AsRef<str>,
386    {
387        self.disable_projection_internal(name, false, options).await
388    }
389
390    async fn disable_projection_internal<Name>(
391        &self,
392        name: Name,
393        write_checkpoint: bool,
394        options: &GenericProjectionOptions,
395    ) -> crate::Result<()>
396    where
397        Name: AsRef<str>,
398    {
399        let req_options = projections::disable_req::Options {
400            name: name.as_ref().to_string(),
401            write_checkpoint,
402        };
403
404        let req = projections::DisableReq {
405            options: Some(req_options),
406        };
407
408        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
409
410        self.client
411            .execute(|handle| async move {
412                let mut client = projections::projections_client::ProjectionsClient::with_origin(
413                    handle.client.clone(),
414                    handle.uri.clone(),
415                );
416
417                let _ = client.disable(req).await?;
418
419                Ok(())
420            })
421            .await
422    }
423
424    pub async fn get_state<Name, A>(
425        &self,
426        name: Name,
427        options: &GetStateProjectionOptions,
428    ) -> crate::Result<serde_json::Result<A>>
429    where
430        Name: AsRef<str>,
431        A: DeserializeOwned + Send,
432    {
433        let req_options = projections::state_req::Options {
434            name: name.as_ref().to_string(),
435            partition: options.partition.clone(),
436        };
437
438        let req = projections::StateReq {
439            options: Some(req_options),
440        };
441
442        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
443
444        self.client
445            .execute(|handle| async move {
446                let mut client = projections::projections_client::ProjectionsClient::with_origin(
447                    handle.client.clone(),
448                    handle.uri.clone(),
449                );
450
451                let resp = client.state(req).await?.into_inner();
452                let value = resp
453                    .state
454                    .map(parse_value)
455                    .unwrap_or(serde_json::Value::Null);
456
457                Ok(serde_json::from_value(value))
458            })
459            .await
460    }
461
462    pub async fn get_result<Name, A>(
463        &self,
464        name: Name,
465        options: &GetResultProjectionOptions,
466    ) -> crate::Result<serde_json::Result<A>>
467    where
468        Name: AsRef<str>,
469        A: DeserializeOwned + Send,
470    {
471        let req_options = projections::result_req::Options {
472            name: name.as_ref().to_string(),
473            partition: options.partition.clone(),
474        };
475
476        let req = projections::ResultReq {
477            options: Some(req_options),
478        };
479
480        let req = crate::commands::new_request(self.client.connection_settings(), options, req);
481
482        self.client
483            .execute(|handle| async move {
484                let mut client = projections::projections_client::ProjectionsClient::with_origin(
485                    handle.client.clone(),
486                    handle.uri.clone(),
487                );
488
489                let resp = client.result(req).await?.into_inner();
490                let value = resp
491                    .result
492                    .map(parse_value)
493                    .unwrap_or(serde_json::Value::Null);
494
495                Ok(serde_json::from_value(value))
496            })
497            .await
498    }
499
500    pub async fn restart_subsystem(&self, options: &GenericProjectionOptions) -> crate::Result<()> {
501        let req = crate::commands::new_request(self.client.connection_settings(), options, ());
502
503        self.client
504            .execute(|handle| async {
505                let mut client = projections::projections_client::ProjectionsClient::with_origin(
506                    handle.client,
507                    handle.uri,
508                );
509                let _ = client.restart_subsystem(req).await?;
510
511                Ok(())
512            })
513            .await
514    }
515}
516
517fn parse_value(value: prost_types::Value) -> serde_json::Value {
518    enum Stack {
519        List(
520            std::vec::IntoIter<prost_types::Value>,
521            Vec<serde_json::Value>,
522        ),
523        Map(
524            std::collections::btree_map::IntoIter<String, prost_types::Value>,
525            String,
526            serde_json::Map<String, serde_json::Value>,
527        ),
528        Value(serde_json::Value),
529        Proto(prost_types::Value),
530    }
531
532    let mut stack = vec![Stack::Proto(value)];
533    let mut param: Option<serde_json::Value> = None;
534
535    while let Some(elem) = stack.pop() {
536        match elem {
537            Stack::Value(value) => {
538                param = Some(value);
539            }
540
541            Stack::Map(mut iter, scope, mut acc) => {
542                let value = param.take().expect("to be defined");
543                acc.insert(scope, value);
544
545                if let Some((scope, value)) = iter.next() {
546                    stack.push(Stack::Map(iter, scope, acc));
547                    stack.push(Stack::Proto(value));
548                } else {
549                    stack.push(Stack::Value(serde_json::Value::Object(acc)));
550                }
551            }
552
553            Stack::List(mut iter, mut acc) => {
554                let value = param.take().expect("to be defined");
555                acc.push(value);
556
557                if let Some(value) = iter.next() {
558                    stack.push(Stack::List(iter, acc));
559                    stack.push(Stack::Proto(value));
560                } else {
561                    stack.push(Stack::Value(serde_json::Value::Array(acc)));
562                }
563            }
564
565            Stack::Proto(proto_value) => {
566                if let Some(kind) = proto_value.kind {
567                    match kind {
568                        prost_types::value::Kind::NullValue(_) => {
569                            stack.push(Stack::Value(serde_json::Value::Null));
570                        }
571
572                        prost_types::value::Kind::NumberValue(value) => {
573                            stack.push(Stack::Value(serde_json::Value::Number(
574                                serde_json::Number::from_f64(value).expect("to be valid"),
575                            )));
576                        }
577
578                        prost_types::value::Kind::StringValue(value) => {
579                            stack.push(Stack::Value(serde_json::Value::String(value)));
580                        }
581
582                        prost_types::value::Kind::BoolValue(value) => {
583                            stack.push(Stack::Value(serde_json::Value::Bool(value)));
584                        }
585
586                        prost_types::value::Kind::StructValue(obj) => {
587                            let mut iter = obj.fields.into_iter();
588
589                            if let Some((key, value)) = iter.next() {
590                                stack.push(Stack::Map(iter, key, serde_json::Map::new()));
591                                stack.push(Stack::Proto(value));
592                            } else {
593                                stack.push(Stack::Value(serde_json::Value::Object(
594                                    serde_json::Map::new(),
595                                )));
596                            }
597                        }
598
599                        prost_types::value::Kind::ListValue(list) => {
600                            let mut iter = list.values.into_iter();
601
602                            if let Some(value) = iter.next() {
603                                stack.push(Stack::List(iter, Vec::new()));
604                                stack.push(Stack::Proto(value));
605                            } else {
606                                stack.push(Stack::Value(serde_json::Value::Array(Vec::new())));
607                            }
608                        }
609                    }
610                } else {
611                    stack.push(Stack::Value(serde_json::Value::Null));
612                }
613            }
614        }
615    }
616
617    param.expect("not empty")
618}
619
620impl From<crate::Client> for ProjectionClient {
621    fn from(src: crate::Client) -> Self {
622        Self { client: src.client }
623    }
624}