1use crate::event_store::client::projections;
2use crate::grpc::{ClientSettings, GrpcClient};
3use crate::options::projections::{
4 CreateProjectionOptions, DeleteProjectionOptions, GenericProjectionOptions,
5 GetResultProjectionOptions, GetStateProjectionOptions, UpdateProjectionOptions,
6};
7use futures::{TryStreamExt, stream::BoxStream};
8use serde::de::DeserializeOwned;
9
10#[allow(dead_code)]
11#[derive(Clone, Debug)]
12pub(crate) enum StatsFor {
13 Name(String),
14 AllProjections,
15 AllContinuous,
16}
17
18#[derive(Clone, Debug)]
19pub struct ProjectionStatus {
20 pub core_processing_time: i64,
21 pub version: i64,
22 pub epoch: i64,
23 pub effective_name: String,
24 pub writes_in_progress: i32,
25 pub reads_in_progress: i32,
26 pub partitions_cached: i32,
27 pub status: String,
28 pub state_reason: String,
29 pub name: String,
30 pub mode: String,
31 pub position: String,
32 pub progress: f32,
33 pub last_checkpoint: String,
34 pub events_processed_after_restart: i64,
35 pub checkpoint_status: String,
36 pub buffered_events: i64,
37 pub write_pending_events_before_checkpoint: i32,
38 pub write_pending_events_after_checkpoint: i32,
39}
40
41#[derive(Clone)]
42pub struct ProjectionClient {
43 client: GrpcClient,
44}
45
46impl ProjectionClient {
47 pub fn new(settings: ClientSettings) -> eyre::Result<Self> {
48 ProjectionClient::with_runtime_handle(tokio::runtime::Handle::current(), settings)
49 }
50
51 pub fn with_runtime_handle(
52 handle: tokio::runtime::Handle,
53 settings: ClientSettings,
54 ) -> eyre::Result<Self> {
55 let client = GrpcClient::create(handle, settings)?;
56
57 Ok(ProjectionClient { client })
58 }
59
60 pub fn settings(&self) -> &ClientSettings {
61 self.client.connection_settings()
62 }
63
64 pub async fn create<Name>(
65 &self,
66 name: Name,
67 query: String,
68 options: &CreateProjectionOptions,
69 ) -> crate::Result<()>
70 where
71 Name: AsRef<str>,
72 {
73 self.create_projection_internal(
74 options,
75 projections::create_req::Options {
76 query: query.clone(),
77 engine_version: options.engine_version.as_i32(),
78 mode: Some(projections::create_req::options::Mode::Continuous(
79 projections::create_req::options::Continuous {
80 name: name.as_ref().to_string(),
81 track_emitted_streams: options.track_emitted_streams,
82 },
83 )),
84 },
85 )
86 .await?;
87
88 if options.emit {
91 let upd_options = UpdateProjectionOptions::default().emit(true);
92
93 self.update(name.as_ref(), query, &upd_options).await?;
94 }
95
96 Ok(())
97 }
98
99 async fn create_projection_internal<Opts>(
100 &self,
101 create_opts: &Opts,
102 options: projections::create_req::Options,
103 ) -> crate::Result<()>
104 where
105 Opts: crate::options::Options,
106 {
107 let req = projections::CreateReq {
108 options: Some(options),
109 };
110
111 let req = crate::commands::new_request(self.client.connection_settings(), create_opts, req);
112
113 self.client
114 .execute(|handle| async move {
115 let mut client = projections::projections_client::ProjectionsClient::with_origin(
116 handle.client,
117 handle.uri,
118 );
119 let _ = client.create(req).await?;
120
121 Ok(())
122 })
123 .await
124 }
125
126 pub async fn update<Name>(
127 &self,
128 name: Name,
129 query: String,
130 options: &UpdateProjectionOptions,
131 ) -> crate::Result<()>
132 where
133 Name: AsRef<str>,
134 {
135 let req_options = projections::update_req::Options {
136 name: name.as_ref().to_string(),
137 emit_option: options
138 .emit
139 .as_ref()
140 .copied()
141 .map(projections::update_req::options::EmitOption::EmitEnabled)
142 .or(Some(
143 projections::update_req::options::EmitOption::NoEmitOptions(()),
144 )),
145 query,
146 };
147
148 let req = projections::UpdateReq {
149 options: Some(req_options),
150 };
151
152 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
153
154 self.client
155 .execute(|handle| async move {
156 let mut client = projections::projections_client::ProjectionsClient::with_origin(
157 handle.client.clone(),
158 handle.uri.clone(),
159 );
160
161 let _ = client.update(req).await?;
162
163 Ok(())
164 })
165 .await
166 }
167
168 pub async fn delete<Name>(
169 &self,
170 name: Name,
171 options: &DeleteProjectionOptions,
172 ) -> crate::Result<()>
173 where
174 Name: AsRef<str>,
175 {
176 let req_options = projections::delete_req::Options {
177 name: name.as_ref().to_string(),
178 delete_emitted_streams: options.delete_emitted_streams,
179 delete_state_stream: options.delete_state_stream,
180 delete_checkpoint_stream: options.delete_checkpoint_stream,
181 };
182
183 let req = projections::DeleteReq {
184 options: Some(req_options),
185 };
186
187 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
188
189 self.client
190 .execute(|handle| async move {
191 let mut client = projections::projections_client::ProjectionsClient::with_origin(
192 handle.client.clone(),
193 handle.uri.clone(),
194 );
195
196 let _ = client.delete(req).await?;
197
198 Ok(())
199 })
200 .await
201 }
202
203 pub async fn get_status<Name>(
204 &self,
205 name: Name,
206 options: &GenericProjectionOptions,
207 ) -> crate::Result<Option<ProjectionStatus>>
208 where
209 Name: AsRef<str>,
210 {
211 self.statistics(StatsFor::Name(name.as_ref().to_string()), options)
212 .await?
213 .try_next()
214 .await
215 }
216
217 pub async fn list(
218 &self,
219 options: &GenericProjectionOptions,
220 ) -> crate::Result<BoxStream<'_, crate::Result<ProjectionStatus>>> {
221 self.statistics(StatsFor::AllContinuous, options).await
222 }
223
224 async fn statistics(
225 &self,
226 stats_for: StatsFor,
227 options: &GenericProjectionOptions,
228 ) -> crate::Result<BoxStream<'_, crate::Result<ProjectionStatus>>> {
229 let mode = match stats_for {
230 StatsFor::Name(name) => projections::statistics_req::options::Mode::Name(name),
231 StatsFor::AllProjections => projections::statistics_req::options::Mode::All(()),
232 StatsFor::AllContinuous => projections::statistics_req::options::Mode::Continuous(()),
233 };
234
235 let stats_options = projections::statistics_req::Options { mode: Some(mode) };
236
237 let req = projections::StatisticsReq {
238 options: Some(stats_options),
239 };
240
241 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
242
243 self.client
244 .execute(|handle| async move {
245 let mut client =
246 projections::projections_client::ProjectionsClient::with_origin(handle.client.clone(), handle.uri.clone());
247
248 let mut stream = client.statistics(req).await?.into_inner();
249
250 let stream = async_stream::stream! {
251 loop {
252 match stream.try_next().await {
253 Err(e) => {
254 let e = crate::Error::from_grpc(e);
255
256 handle.report_error(&e);
257 yield Err(e);
258 break;
259 }
260
261 Ok(resp) => {
262 if let Some(resp) = resp {
263 let details = resp.details.expect("to be defined");
264 let details = ProjectionStatus {
265 core_processing_time: details.core_processing_time,
266 version: details.version,
267 epoch: details.epoch,
268 effective_name: details.effective_name,
269 writes_in_progress: details.writes_in_progress,
270 reads_in_progress: details.reads_in_progress,
271 partitions_cached: details.partitions_cached,
272 status: details.status,
273 state_reason: details.state_reason,
274 name: details.name,
275 mode: details.mode,
276 position: details.position,
277 progress: details.progress,
278 last_checkpoint: details.last_checkpoint,
279 events_processed_after_restart: details.events_processed_after_restart,
280 checkpoint_status: details.checkpoint_status,
281 buffered_events: details.buffered_events,
282 write_pending_events_after_checkpoint: details.write_pending_events_after_checkpoint,
283 write_pending_events_before_checkpoint: details.write_pending_events_before_checkpoint,
284 };
285
286 yield Ok(details);
287 continue;
288 }
289
290 break;
291 }
292 }
293 }
294 };
295
296 let stream: BoxStream<crate::Result<ProjectionStatus>> = Box::pin(stream);
297
298 Ok(stream)
299 })
300 .await
301 }
302
303 pub async fn enable<Name>(
304 &self,
305 name: Name,
306 options: &GenericProjectionOptions,
307 ) -> crate::Result<()>
308 where
309 Name: AsRef<str>,
310 {
311 let req_options = projections::enable_req::Options {
312 name: name.as_ref().to_string(),
313 };
314
315 let req = projections::EnableReq {
316 options: Some(req_options),
317 };
318
319 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
320
321 self.client
322 .execute(|handle| async move {
323 let mut client = projections::projections_client::ProjectionsClient::with_origin(
324 handle.client.clone(),
325 handle.uri.clone(),
326 );
327
328 let _ = client.enable(req).await?;
329
330 Ok(())
331 })
332 .await
333 }
334
335 pub async fn reset<Name>(
336 &self,
337 name: Name,
338 options: &GenericProjectionOptions,
339 ) -> crate::Result<()>
340 where
341 Name: AsRef<str>,
342 {
343 let req_options = projections::reset_req::Options {
344 name: name.as_ref().to_string(),
345 write_checkpoint: false,
346 };
347
348 let req = projections::ResetReq {
349 options: Some(req_options),
350 };
351
352 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
353
354 self.client
355 .execute(|handle| async move {
356 let mut client = projections::projections_client::ProjectionsClient::with_origin(
357 handle.client.clone(),
358 handle.uri.clone(),
359 );
360
361 let _ = client.reset(req).await?;
362
363 Ok(())
364 })
365 .await
366 }
367
368 pub async fn disable<Name>(
369 &self,
370 name: Name,
371 options: &GenericProjectionOptions,
372 ) -> crate::Result<()>
373 where
374 Name: AsRef<str>,
375 {
376 self.disable_projection_internal(name, true, options).await
377 }
378
379 pub async fn abort<Name>(
380 &self,
381 name: Name,
382 options: &GenericProjectionOptions,
383 ) -> crate::Result<()>
384 where
385 Name: AsRef<str>,
386 {
387 self.disable_projection_internal(name, false, options).await
388 }
389
390 async fn disable_projection_internal<Name>(
391 &self,
392 name: Name,
393 write_checkpoint: bool,
394 options: &GenericProjectionOptions,
395 ) -> crate::Result<()>
396 where
397 Name: AsRef<str>,
398 {
399 let req_options = projections::disable_req::Options {
400 name: name.as_ref().to_string(),
401 write_checkpoint,
402 };
403
404 let req = projections::DisableReq {
405 options: Some(req_options),
406 };
407
408 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
409
410 self.client
411 .execute(|handle| async move {
412 let mut client = projections::projections_client::ProjectionsClient::with_origin(
413 handle.client.clone(),
414 handle.uri.clone(),
415 );
416
417 let _ = client.disable(req).await?;
418
419 Ok(())
420 })
421 .await
422 }
423
424 pub async fn get_state<Name, A>(
425 &self,
426 name: Name,
427 options: &GetStateProjectionOptions,
428 ) -> crate::Result<serde_json::Result<A>>
429 where
430 Name: AsRef<str>,
431 A: DeserializeOwned + Send,
432 {
433 let req_options = projections::state_req::Options {
434 name: name.as_ref().to_string(),
435 partition: options.partition.clone(),
436 };
437
438 let req = projections::StateReq {
439 options: Some(req_options),
440 };
441
442 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
443
444 self.client
445 .execute(|handle| async move {
446 let mut client = projections::projections_client::ProjectionsClient::with_origin(
447 handle.client.clone(),
448 handle.uri.clone(),
449 );
450
451 let resp = client.state(req).await?.into_inner();
452 let value = resp
453 .state
454 .map(parse_value)
455 .unwrap_or(serde_json::Value::Null);
456
457 Ok(serde_json::from_value(value))
458 })
459 .await
460 }
461
462 pub async fn get_result<Name, A>(
463 &self,
464 name: Name,
465 options: &GetResultProjectionOptions,
466 ) -> crate::Result<serde_json::Result<A>>
467 where
468 Name: AsRef<str>,
469 A: DeserializeOwned + Send,
470 {
471 let req_options = projections::result_req::Options {
472 name: name.as_ref().to_string(),
473 partition: options.partition.clone(),
474 };
475
476 let req = projections::ResultReq {
477 options: Some(req_options),
478 };
479
480 let req = crate::commands::new_request(self.client.connection_settings(), options, req);
481
482 self.client
483 .execute(|handle| async move {
484 let mut client = projections::projections_client::ProjectionsClient::with_origin(
485 handle.client.clone(),
486 handle.uri.clone(),
487 );
488
489 let resp = client.result(req).await?.into_inner();
490 let value = resp
491 .result
492 .map(parse_value)
493 .unwrap_or(serde_json::Value::Null);
494
495 Ok(serde_json::from_value(value))
496 })
497 .await
498 }
499
500 pub async fn restart_subsystem(&self, options: &GenericProjectionOptions) -> crate::Result<()> {
501 let req = crate::commands::new_request(self.client.connection_settings(), options, ());
502
503 self.client
504 .execute(|handle| async {
505 let mut client = projections::projections_client::ProjectionsClient::with_origin(
506 handle.client,
507 handle.uri,
508 );
509 let _ = client.restart_subsystem(req).await?;
510
511 Ok(())
512 })
513 .await
514 }
515}
516
517fn parse_value(value: prost_types::Value) -> serde_json::Value {
518 enum Stack {
519 List(
520 std::vec::IntoIter<prost_types::Value>,
521 Vec<serde_json::Value>,
522 ),
523 Map(
524 std::collections::btree_map::IntoIter<String, prost_types::Value>,
525 String,
526 serde_json::Map<String, serde_json::Value>,
527 ),
528 Value(serde_json::Value),
529 Proto(prost_types::Value),
530 }
531
532 let mut stack = vec![Stack::Proto(value)];
533 let mut param: Option<serde_json::Value> = None;
534
535 while let Some(elem) = stack.pop() {
536 match elem {
537 Stack::Value(value) => {
538 param = Some(value);
539 }
540
541 Stack::Map(mut iter, scope, mut acc) => {
542 let value = param.take().expect("to be defined");
543 acc.insert(scope, value);
544
545 if let Some((scope, value)) = iter.next() {
546 stack.push(Stack::Map(iter, scope, acc));
547 stack.push(Stack::Proto(value));
548 } else {
549 stack.push(Stack::Value(serde_json::Value::Object(acc)));
550 }
551 }
552
553 Stack::List(mut iter, mut acc) => {
554 let value = param.take().expect("to be defined");
555 acc.push(value);
556
557 if let Some(value) = iter.next() {
558 stack.push(Stack::List(iter, acc));
559 stack.push(Stack::Proto(value));
560 } else {
561 stack.push(Stack::Value(serde_json::Value::Array(acc)));
562 }
563 }
564
565 Stack::Proto(proto_value) => {
566 if let Some(kind) = proto_value.kind {
567 match kind {
568 prost_types::value::Kind::NullValue(_) => {
569 stack.push(Stack::Value(serde_json::Value::Null));
570 }
571
572 prost_types::value::Kind::NumberValue(value) => {
573 stack.push(Stack::Value(serde_json::Value::Number(
574 serde_json::Number::from_f64(value).expect("to be valid"),
575 )));
576 }
577
578 prost_types::value::Kind::StringValue(value) => {
579 stack.push(Stack::Value(serde_json::Value::String(value)));
580 }
581
582 prost_types::value::Kind::BoolValue(value) => {
583 stack.push(Stack::Value(serde_json::Value::Bool(value)));
584 }
585
586 prost_types::value::Kind::StructValue(obj) => {
587 let mut iter = obj.fields.into_iter();
588
589 if let Some((key, value)) = iter.next() {
590 stack.push(Stack::Map(iter, key, serde_json::Map::new()));
591 stack.push(Stack::Proto(value));
592 } else {
593 stack.push(Stack::Value(serde_json::Value::Object(
594 serde_json::Map::new(),
595 )));
596 }
597 }
598
599 prost_types::value::Kind::ListValue(list) => {
600 let mut iter = list.values.into_iter();
601
602 if let Some(value) = iter.next() {
603 stack.push(Stack::List(iter, Vec::new()));
604 stack.push(Stack::Proto(value));
605 } else {
606 stack.push(Stack::Value(serde_json::Value::Array(Vec::new())));
607 }
608 }
609 }
610 } else {
611 stack.push(Stack::Value(serde_json::Value::Null));
612 }
613 }
614 }
615 }
616
617 param.expect("not empty")
618}
619
620impl From<crate::Client> for ProjectionClient {
621 fn from(src: crate::Client) -> Self {
622 Self { client: src.client }
623 }
624}