crabka_client_consumer/
commit.rs1use std::collections::HashMap;
4
5use crabka_protocol::owned::offset_commit_request::OffsetCommitRequest;
6
7use crate::consumer::Consumer;
8use crate::error::ConsumerError;
9use crate::offset_wire::build_commit_topics;
10
11impl Consumer {
12 pub async fn commit_sync(&self) -> Result<(), ConsumerError> {
15 let raw_offsets = self.next_offsets.lock().await.clone();
16 if raw_offsets.is_empty() {
17 return Ok(());
18 }
19 let pos = self.positions.lock().await;
20 let offsets: HashMap<(String, i32), (i64, i32)> = raw_offsets
21 .into_iter()
22 .map(|(k, v)| {
23 let epoch = pos.get(&k).map_or(-1, |p| p.offset_epoch);
24 (k, (v, epoch))
25 })
26 .collect();
27 drop(pos);
28 let topic_ids = self.topic_ids.lock().await.clone();
29 let topics = build_commit_topics(offsets, &topic_ids);
30
31 let resp = self
32 .client
33 .send(OffsetCommitRequest {
34 group_id: self.group_id.clone(),
35 generation_id_or_member_epoch: self.generation_id,
36 member_id: self.member_id.clone(),
37 topics,
38 ..Default::default()
39 })
40 .await?;
41
42 for t in &resp.topics {
44 for p in &t.partitions {
45 if p.error_code != 0 {
46 return Err(ConsumerError::Server(p.error_code));
47 }
48 }
49 }
50 Ok(())
51 }
52
53 pub fn commit_async(&self) {
57 let client = self.client.clone();
58 let group_id = self.group_id.clone();
59 let generation = self.generation_id;
60 let member_id = self.member_id.clone();
61 let offsets = self.next_offsets.clone();
62 let positions = self.positions.clone();
63 let topic_ids = self.topic_ids.clone();
64 tokio::spawn(async move {
65 let raw_snapshot = offsets.lock().await.clone();
66 if raw_snapshot.is_empty() {
67 return;
68 }
69 let pos = positions.lock().await;
70 let snapshot: HashMap<(String, i32), (i64, i32)> = raw_snapshot
71 .into_iter()
72 .map(|(k, v)| {
73 let epoch = pos.get(&k).map_or(-1, |p| p.offset_epoch);
74 (k, (v, epoch))
75 })
76 .collect();
77 drop(pos);
78 let topic_ids = topic_ids.lock().await.clone();
79 let topics = build_commit_topics(snapshot, &topic_ids);
80 let res = client
81 .send(OffsetCommitRequest {
82 group_id,
83 generation_id_or_member_epoch: generation,
84 member_id,
85 topics,
86 ..Default::default()
87 })
88 .await;
89 if let Err(e) = res {
90 tracing::warn!(error = %e, "commit_async failed");
91 }
92 });
93 }
94}