Skip to main content

crabka_client_consumer/
commit.rs

1//! `Consumer::commit_sync` and `commit_async`.
2
3use std::collections::HashMap;
4
5use crabka_protocol::owned::offset_commit_request::OffsetCommitRequest;
6
7use crate::consumer::Consumer;
8use crate::error::ConsumerError;
9use crate::offset_wire::build_commit_topics;
10
11impl Consumer {
12    /// Commit the current next-offsets for every assigned partition.
13    /// Blocks until the broker acks.
14    pub async fn commit_sync(&self) -> Result<(), ConsumerError> {
15        let raw_offsets = self.next_offsets.lock().await.clone();
16        if raw_offsets.is_empty() {
17            return Ok(());
18        }
19        let pos = self.positions.lock().await;
20        let offsets: HashMap<(String, i32), (i64, i32)> = raw_offsets
21            .into_iter()
22            .map(|(k, v)| {
23                let epoch = pos.get(&k).map_or(-1, |p| p.offset_epoch);
24                (k, (v, epoch))
25            })
26            .collect();
27        drop(pos);
28        let topic_ids = self.topic_ids.lock().await.clone();
29        let topics = build_commit_topics(offsets, &topic_ids);
30
31        let resp = self
32            .client
33            .send(OffsetCommitRequest {
34                group_id: self.group_id.clone(),
35                generation_id_or_member_epoch: self.generation_id,
36                member_id: self.member_id.clone(),
37                topics,
38                ..Default::default()
39            })
40            .await?;
41
42        // Surface the first non-zero error_code if any.
43        for t in &resp.topics {
44            for p in &t.partitions {
45                if p.error_code != 0 {
46                    return Err(ConsumerError::Server(p.error_code));
47                }
48            }
49        }
50        Ok(())
51    }
52
53    /// Fire-and-forget commit. Returns once the request is enqueued on the
54    /// client's writer task; does NOT wait for the broker ack. Errors are
55    /// logged but not returned.
56    pub fn commit_async(&self) {
57        let client = self.client.clone();
58        let group_id = self.group_id.clone();
59        let generation = self.generation_id;
60        let member_id = self.member_id.clone();
61        let offsets = self.next_offsets.clone();
62        let positions = self.positions.clone();
63        let topic_ids = self.topic_ids.clone();
64        tokio::spawn(async move {
65            let raw_snapshot = offsets.lock().await.clone();
66            if raw_snapshot.is_empty() {
67                return;
68            }
69            let pos = positions.lock().await;
70            let snapshot: HashMap<(String, i32), (i64, i32)> = raw_snapshot
71                .into_iter()
72                .map(|(k, v)| {
73                    let epoch = pos.get(&k).map_or(-1, |p| p.offset_epoch);
74                    (k, (v, epoch))
75                })
76                .collect();
77            drop(pos);
78            let topic_ids = topic_ids.lock().await.clone();
79            let topics = build_commit_topics(snapshot, &topic_ids);
80            let res = client
81                .send(OffsetCommitRequest {
82                    group_id,
83                    generation_id_or_member_epoch: generation,
84                    member_id,
85                    topics,
86                    ..Default::default()
87                })
88                .await;
89            if let Err(e) = res {
90                tracing::warn!(error = %e, "commit_async failed");
91            }
92        });
93    }
94}