1use std::fmt;
2
3use aok::{OK, Void};
4use fred::{
5 interfaces::{ClientLike, StreamsInterface},
6 prelude::FredResult,
7 types::{CustomCommand, Value},
8};
9use xkv::R;
10
11use crate::{Conf, Parse, StreamItem, auto_new, parse_stream};
12
13pub struct ReadGroup<P: Parse> {
14 conf: Conf,
15 parse: P,
16 args: Vec<Value>,
17}
18
19impl<P: Parse + Send + Sync + 'static> ReadGroup<P> {
20 pub fn new(parse: P, conf: Conf) -> Self {
21 let count_str = conf.count.to_string();
22 let claim_str = conf.claim_idle_ms.to_string();
23
24 let mut args: Vec<Value> = vec![
25 "GROUP".into(),
26 conf.group.clone().into(),
27 conf.consumer.clone().into(),
28 "COUNT".into(),
29 count_str.into(),
30 ];
31
32 if conf.block_ms > 0 {
33 args.push("BLOCK".into());
34 args.push(conf.block_ms.to_string().into());
35 }
36
37 args.extend(vec![
38 "CLAIM".into(),
39 claim_str.into(),
40 "STREAMS".into(),
41 conf.stream.clone().into(),
42 ">".into(),
43 ]);
44
45 Self { conf, parse, args }
46 }
47
48 pub async fn run(&self) -> Void {
49 let group = &self.conf.group;
50 let stream = &self.conf.stream;
51
52 loop {
53 let li: FredResult<Vec<StreamItem>> = R
54 .custom(
55 CustomCommand::new("XREADGROUP", stream.as_bytes(), true),
56 self.args.clone(),
57 )
58 .await
59 .map(parse_stream);
60
61 if let Some(li) = auto_new(stream, group, li).await? {
62 if li.is_empty() {
63 break;
64 }
65
66 let ing = unsafe {
67 async_scoped::TokioScope::scope_and_collect(|s| {
68 for StreamItem { retry, kv, .. } in &li {
69 s.spawn(self.parse.run(kv, *retry))
70 }
71 })
72 }
73 .await
74 .1;
75
76 let mut id_li: Vec<String> = Vec::new();
77
78 let mut has_add = false;
79 let p = R.pipeline();
80 for (res, StreamItem { id, retry, kv, .. }) in ing.into_iter().zip(li.into_iter()) {
81 let err = match res {
82 Err(err) => err.to_string(),
83 Ok(task) => match task {
84 Ok(opt_kv) => {
85 id_li.push(id);
86 if let Some(new_kv) = opt_kv {
87 let _: () = p.xadd(&self.conf.stream, false, None, "*", new_kv).await?;
88 has_add = true;
89 }
90 continue;
91 }
92 Err(err) => err.to_string(),
93 },
94 };
95
96 log::error!("{id} retry {retry} {err}");
97 if retry > self.conf.max_retry {
98 id_li.push(id);
99 let task = self.parse.on_error(kv, err);
100 if let Err(e) = task.await {
101 log::error!("{e}");
102 }
103 }
104 }
105
106 if has_add {
107 let _: () = p.last().await?;
108 }
109
110 if !id_li.is_empty() {
111 crate::rm_id_li(&self.conf.stream, &self.conf.group, id_li).await?;
112 }
113 }
114 }
115 OK
116 }
117}
118
119impl<P: Parse> fmt::Display for ReadGroup<P> {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(
122 f,
123 "{} {} {}",
124 String::from_utf8_lossy(self.conf.stream.as_bytes()),
125 self.conf.group,
126 self.conf.consumer
127 )
128 }
129}
130
131impl<P: Parse> fmt::Debug for ReadGroup<P> {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 f.debug_struct("ReadGroup")
134 .field("conf", &self.conf)
135 .finish()
136 }
137}