msgq/
read_group.rs

1use std::fmt;
2
3use aok::{OK, Void};
4use fred::{
5  interfaces::{ClientLike, StreamsInterface},
6  prelude::FredResult,
7  types::{CustomCommand, Value},
8};
9use xkv::R;
10
11use crate::{Conf, Parse, StreamItem, auto_new, parse_stream};
12
13pub struct ReadGroup<P: Parse> {
14  conf: Conf,
15  parse: P,
16  args: Vec<Value>,
17}
18
19impl<P: Parse + Send + Sync + 'static> ReadGroup<P> {
20  pub fn new(parse: P, conf: Conf) -> Self {
21    let count_str = conf.count.to_string();
22    let claim_str = conf.claim_idle_ms.to_string();
23
24    let mut args: Vec<Value> = vec![
25      "GROUP".into(),
26      conf.group.clone().into(),
27      conf.consumer.clone().into(),
28      "COUNT".into(),
29      count_str.into(),
30    ];
31
32    if conf.block_ms > 0 {
33      args.push("BLOCK".into());
34      args.push(conf.block_ms.to_string().into());
35    }
36
37    args.extend(vec![
38      "CLAIM".into(),
39      claim_str.into(),
40      "STREAMS".into(),
41      conf.stream.clone().into(),
42      ">".into(),
43    ]);
44
45    Self { conf, parse, args }
46  }
47
48  pub async fn run(&self) -> Void {
49    let group = &self.conf.group;
50    let stream = &self.conf.stream;
51
52    loop {
53      let li: FredResult<Vec<StreamItem>> = R
54        .custom(
55          CustomCommand::new("XREADGROUP", stream.as_bytes(), true),
56          self.args.clone(),
57        )
58        .await
59        .map(parse_stream);
60
61      if let Some(li) = auto_new(stream, group, li).await? {
62        if li.is_empty() {
63          break;
64        }
65
66        let ing = unsafe {
67          async_scoped::TokioScope::scope_and_collect(|s| {
68            for StreamItem { retry, kv, .. } in &li {
69              s.spawn(self.parse.run(kv, *retry))
70            }
71          })
72        }
73        .await
74        .1;
75
76        let mut id_li: Vec<String> = Vec::new();
77
78        let mut has_add = false;
79        let p = R.pipeline();
80        for (res, StreamItem { id, retry, kv, .. }) in ing.into_iter().zip(li.into_iter()) {
81          let err = match res {
82            Err(err) => err.to_string(),
83            Ok(task) => match task {
84              Ok(opt_kv) => {
85                id_li.push(id);
86                if let Some(new_kv) = opt_kv {
87                  let _: () = p.xadd(&self.conf.stream, false, None, "*", new_kv).await?;
88                  has_add = true;
89                }
90                continue;
91              }
92              Err(err) => err.to_string(),
93            },
94          };
95
96          log::error!("{id} retry {retry} {err}");
97          if retry > self.conf.max_retry {
98            id_li.push(id);
99            let task = self.parse.on_error(kv, err);
100            if let Err(e) = task.await {
101              log::error!("{e}");
102            }
103          }
104        }
105
106        if has_add {
107          let _: () = p.last().await?;
108        }
109
110        if !id_li.is_empty() {
111          crate::rm_id_li(&self.conf.stream, &self.conf.group, id_li).await?;
112        }
113      }
114    }
115    OK
116  }
117}
118
119impl<P: Parse> fmt::Display for ReadGroup<P> {
120  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121    write!(
122      f,
123      "{} {} {}",
124      String::from_utf8_lossy(self.conf.stream.as_bytes()),
125      self.conf.group,
126      self.conf.consumer
127    )
128  }
129}
130
131impl<P: Parse> fmt::Debug for ReadGroup<P> {
132  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133    f.debug_struct("ReadGroup")
134      .field("conf", &self.conf)
135      .finish()
136  }
137}