1use std::collections::HashMap;
19use std::sync::Arc;
20use std::thread;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
24use rocketmq_common::common::message::message_ext::MessageExt;
25use rocketmq_common::common::message::message_queue::MessageQueue;
26use rocketmq_common::utils::util_all;
27use rocketmq_common::TimeUtils::get_current_millis;
28use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
29use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
30use rocketmq_remoting::runtime::RPCHook;
31use rocketmq_rust::ArcMut;
32use tokio::runtime::Handle;
33
34use crate::base::client_config::ClientConfig;
35use crate::base::mq_admin::MQAdmin;
36use crate::base::query_result::QueryResult;
37use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
38use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
39use crate::consumer::default_mq_push_consumer_builder::DefaultMQPushConsumerBuilder;
40use crate::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
41use crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
42use crate::consumer::listener::consume_orderly_context::ConsumeOrderlyContext;
43use crate::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus;
44use crate::consumer::listener::message_listener::MessageListener;
45use crate::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
46use crate::consumer::listener::message_listener_orderly::MessageListenerOrderly;
47use crate::consumer::message_queue_listener::MessageQueueListener;
48use crate::consumer::message_selector::MessageSelector;
49use crate::consumer::mq_consumer::MQConsumer;
50use crate::consumer::mq_push_consumer::MQPushConsumer;
51use crate::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
52use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
53use crate::trace::hook::consume_message_trace_hook_impl::ConsumeMessageTraceHookImpl;
54use crate::trace::trace_dispatcher::TraceDispatcher;
55use crate::trace::trace_dispatcher::Type;
56
57#[derive(Clone)]
58pub struct ConsumerConfig {
59 pub(crate) consumer_group: CheetahString,
60 pub(crate) topic: CheetahString,
61 pub(crate) sub_expression: CheetahString,
62 pub(crate) message_model: MessageModel,
63 pub(crate) consume_from_where: ConsumeFromWhere,
64 pub(crate) consume_timestamp: Option<CheetahString>,
65 pub(crate) allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
66 pub(crate) subscription: ArcMut<HashMap<CheetahString, CheetahString>>,
68 pub(crate) message_listener: Option<ArcMut<MessageListener>>,
69 pub(crate) message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
70 pub(crate) consume_thread_min: u32,
71 pub(crate) consume_thread_max: u32,
72 pub(crate) adjust_thread_pool_nums_threshold: u64,
73 pub(crate) consume_concurrently_max_span: u32,
74 pub(crate) pull_threshold_for_queue: u32,
75 pub(crate) pop_threshold_for_queue: u32,
76 pub(crate) pull_threshold_size_for_queue: u32,
77 pub(crate) pull_threshold_for_topic: i32,
78 pub(crate) pull_threshold_size_for_topic: i32,
79 pub(crate) pull_interval: u64,
80 pub(crate) consume_message_batch_max_size: u32,
81 pub(crate) pull_batch_size: u32,
82 pub(crate) pull_batch_size_in_bytes: u32,
83 pub(crate) post_subscription_when_pull: bool,
84 pub(crate) unit_mode: bool,
85 pub(crate) max_reconsume_times: i32,
86 pub(crate) suspend_current_queue_time_millis: u64,
87 pub(crate) consume_timeout: u64,
88 pub(crate) pop_invisible_time: u64,
89 pub(crate) pop_batch_nums: u32,
90 pub(crate) await_termination_millis_when_shutdown: u64,
91 pub(crate) trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
92 pub(crate) client_rebalance: bool,
93 pub(crate) rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
94}
95
96impl ConsumerConfig {
97 pub fn consumer_group(&self) -> &CheetahString {
98 &self.consumer_group
99 }
100
101 pub fn message_model(&self) -> MessageModel {
102 self.message_model
103 }
104
105 pub fn consume_from_where(&self) -> ConsumeFromWhere {
106 self.consume_from_where
107 }
108
109 pub fn consume_timestamp(&self) -> &Option<CheetahString> {
110 &self.consume_timestamp
111 }
112
113 pub fn allocate_message_queue_strategy(&self) -> Option<Arc<dyn AllocateMessageQueueStrategy>> {
114 self.allocate_message_queue_strategy.clone()
115 }
116
117 pub fn subscription(&self) -> &ArcMut<HashMap<CheetahString, CheetahString>> {
118 &self.subscription
119 }
120
121 pub fn consume_thread_min(&self) -> u32 {
130 self.consume_thread_min
131 }
132
133 pub fn consume_thread_max(&self) -> u32 {
134 self.consume_thread_max
135 }
136
137 pub fn adjust_thread_pool_nums_threshold(&self) -> u64 {
138 self.adjust_thread_pool_nums_threshold
139 }
140
141 pub fn consume_concurrently_max_span(&self) -> u32 {
142 self.consume_concurrently_max_span
143 }
144
145 pub fn pull_threshold_for_queue(&self) -> u32 {
146 self.pull_threshold_for_queue
147 }
148
149 pub fn pop_threshold_for_queue(&self) -> u32 {
150 self.pop_threshold_for_queue
151 }
152
153 pub fn pull_threshold_size_for_queue(&self) -> u32 {
154 self.pull_threshold_size_for_queue
155 }
156
157 pub fn pull_threshold_for_topic(&self) -> i32 {
158 self.pull_threshold_for_topic
159 }
160
161 pub fn pull_threshold_size_for_topic(&self) -> i32 {
162 self.pull_threshold_size_for_topic
163 }
164
165 pub fn pull_interval(&self) -> u64 {
166 self.pull_interval
167 }
168
169 pub fn consume_message_batch_max_size(&self) -> u32 {
170 self.consume_message_batch_max_size
171 }
172
173 pub fn pull_batch_size(&self) -> u32 {
174 self.pull_batch_size
175 }
176
177 pub fn pull_batch_size_in_bytes(&self) -> u32 {
178 self.pull_batch_size_in_bytes
179 }
180
181 pub fn post_subscription_when_pull(&self) -> bool {
182 self.post_subscription_when_pull
183 }
184
185 pub fn unit_mode(&self) -> bool {
186 self.unit_mode
187 }
188
189 pub fn max_reconsume_times(&self) -> i32 {
190 self.max_reconsume_times
191 }
192
193 pub fn suspend_current_queue_time_millis(&self) -> u64 {
194 self.suspend_current_queue_time_millis
195 }
196
197 pub fn consume_timeout(&self) -> u64 {
198 self.consume_timeout
199 }
200
201 pub fn pop_invisible_time(&self) -> u64 {
202 self.pop_invisible_time
203 }
204
205 pub fn pop_batch_nums(&self) -> u32 {
206 self.pop_batch_nums
207 }
208
209 pub fn await_termination_millis_when_shutdown(&self) -> u64 {
210 self.await_termination_millis_when_shutdown
211 }
212
213 pub fn trace_dispatcher(&self) -> &Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>> {
214 &self.trace_dispatcher
215 }
216
217 pub fn client_rebalance(&self) -> bool {
218 self.client_rebalance
219 }
220
221 pub fn rpc_hook(&self) -> &Option<Arc<Box<dyn RPCHook>>> {
222 &self.rpc_hook
223 }
224
225 pub fn set_consumer_group(&mut self, consumer_group: CheetahString) {
226 self.consumer_group = consumer_group;
227 }
228
229 pub fn set_message_model(&mut self, message_model: MessageModel) {
230 self.message_model = message_model;
231 }
232
233 pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
234 self.consume_from_where = consume_from_where;
235 }
236
237 pub fn set_consume_timestamp(&mut self, consume_timestamp: Option<CheetahString>) {
238 self.consume_timestamp = consume_timestamp;
239 }
240
241 pub fn set_allocate_message_queue_strategy(
242 &mut self,
243 allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy>,
244 ) {
245 self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
246 }
247
248 pub fn set_subscription(
253 &mut self,
254 subscription: ArcMut<HashMap<CheetahString, CheetahString>>,
255 ) {
256 self.subscription = subscription;
257 }
258
259 pub fn set_message_queue_listener(
267 &mut self,
268 message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
269 ) {
270 self.message_queue_listener = message_queue_listener;
271 }
272
273 pub fn set_consume_thread_min(&mut self, consume_thread_min: u32) {
274 self.consume_thread_min = consume_thread_min;
275 }
276
277 pub fn set_consume_thread_max(&mut self, consume_thread_max: u32) {
278 self.consume_thread_max = consume_thread_max;
279 }
280
281 pub fn set_adjust_thread_pool_nums_threshold(
282 &mut self,
283 adjust_thread_pool_nums_threshold: u64,
284 ) {
285 self.adjust_thread_pool_nums_threshold = adjust_thread_pool_nums_threshold;
286 }
287
288 pub fn set_consume_concurrently_max_span(&mut self, consume_concurrently_max_span: u32) {
289 self.consume_concurrently_max_span = consume_concurrently_max_span;
290 }
291
292 pub fn set_pull_threshold_for_queue(&mut self, pull_threshold_for_queue: u32) {
293 self.pull_threshold_for_queue = pull_threshold_for_queue;
294 }
295
296 pub fn set_pop_threshold_for_queue(&mut self, pop_threshold_for_queue: u32) {
297 self.pop_threshold_for_queue = pop_threshold_for_queue;
298 }
299
300 pub fn set_pull_threshold_size_for_queue(&mut self, pull_threshold_size_for_queue: u32) {
301 self.pull_threshold_size_for_queue = pull_threshold_size_for_queue;
302 }
303
304 pub fn set_pull_threshold_for_topic(&mut self, pull_threshold_for_topic: i32) {
305 self.pull_threshold_for_topic = pull_threshold_for_topic;
306 }
307
308 pub fn set_pull_threshold_size_for_topic(&mut self, pull_threshold_size_for_topic: i32) {
309 self.pull_threshold_size_for_topic = pull_threshold_size_for_topic;
310 }
311
312 pub fn set_pull_interval(&mut self, pull_interval: u64) {
313 self.pull_interval = pull_interval;
314 }
315
316 pub fn set_consume_message_batch_max_size(&mut self, consume_message_batch_max_size: u32) {
317 self.consume_message_batch_max_size = consume_message_batch_max_size;
318 }
319
320 pub fn set_pull_batch_size(&mut self, pull_batch_size: u32) {
321 self.pull_batch_size = pull_batch_size;
322 }
323
324 pub fn set_pull_batch_size_in_bytes(&mut self, pull_batch_size_in_bytes: u32) {
325 self.pull_batch_size_in_bytes = pull_batch_size_in_bytes;
326 }
327
328 pub fn set_post_subscription_when_pull(&mut self, post_subscription_when_pull: bool) {
329 self.post_subscription_when_pull = post_subscription_when_pull;
330 }
331
332 pub fn set_unit_mode(&mut self, unit_mode: bool) {
333 self.unit_mode = unit_mode;
334 }
335
336 pub fn set_max_reconsume_times(&mut self, max_reconsume_times: i32) {
337 self.max_reconsume_times = max_reconsume_times;
338 }
339
340 pub fn set_suspend_current_queue_time_millis(
341 &mut self,
342 suspend_current_queue_time_millis: u64,
343 ) {
344 self.suspend_current_queue_time_millis = suspend_current_queue_time_millis;
345 }
346
347 pub fn set_consume_timeout(&mut self, consume_timeout: u64) {
348 self.consume_timeout = consume_timeout;
349 }
350
351 pub fn set_pop_invisible_time(&mut self, pop_invisible_time: u64) {
352 self.pop_invisible_time = pop_invisible_time;
353 }
354
355 pub fn set_pop_batch_nums(&mut self, pop_batch_nums: u32) {
356 self.pop_batch_nums = pop_batch_nums;
357 }
358
359 pub fn set_await_termination_millis_when_shutdown(
360 &mut self,
361 await_termination_millis_when_shutdown: u64,
362 ) {
363 self.await_termination_millis_when_shutdown = await_termination_millis_when_shutdown;
364 }
365
366 pub fn set_trace_dispatcher(
367 &mut self,
368 trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
369 ) {
370 self.trace_dispatcher = trace_dispatcher;
371 }
372
373 pub fn set_client_rebalance(&mut self, client_rebalance: bool) {
374 self.client_rebalance = client_rebalance;
375 }
376
377 pub fn set_rpc_hook(&mut self, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) {
378 self.rpc_hook = rpc_hook;
379 }
380}
381
382impl Default for ConsumerConfig {
383 fn default() -> Self {
384 ConsumerConfig {
385 consumer_group: CheetahString::new(),
386 topic: CheetahString::new(),
387 sub_expression: CheetahString::new(),
388 message_model: MessageModel::Clustering,
389 consume_from_where: ConsumeFromWhere::ConsumeFromLastOffset,
390 consume_timestamp: Some(CheetahString::from_string(
391 util_all::time_millis_to_human_string3(
392 (get_current_millis() - (1000 * 60 * 30)) as i64,
393 ),
394 )),
395 allocate_message_queue_strategy: Some(Arc::new(AllocateMessageQueueAveragely)),
396 subscription: ArcMut::new(HashMap::new()),
397 message_listener: None,
398 message_queue_listener: None,
399
400 consume_thread_min: 20,
401 consume_thread_max: 64,
402 adjust_thread_pool_nums_threshold: 100_000,
403 consume_concurrently_max_span: 2000,
404 pull_threshold_for_queue: 1000,
405 pop_threshold_for_queue: 96,
406 pull_threshold_size_for_queue: 100,
407 pull_threshold_for_topic: -1,
408 pull_threshold_size_for_topic: -1,
409 pull_interval: 0,
410 consume_message_batch_max_size: 1,
411 pull_batch_size: 32,
412 pull_batch_size_in_bytes: 256 * 1024,
413 post_subscription_when_pull: false,
414 unit_mode: false,
415 max_reconsume_times: -1,
416 suspend_current_queue_time_millis: 1000,
417 consume_timeout: 15,
418 pop_invisible_time: 60000,
419 pop_batch_nums: 32,
420 await_termination_millis_when_shutdown: 0,
421 trace_dispatcher: None,
422 client_rebalance: true,
423 rpc_hook: None,
424 }
425 }
426}
427
428pub struct DefaultMQPushConsumer {
429 client_config: ClientConfig,
430 consumer_config: ArcMut<ConsumerConfig>,
431 pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
432}
433
434impl MQConsumer for DefaultMQPushConsumer {
435 async fn send_message_back(
436 &mut self,
437 msg: MessageExt,
438 delay_level: i32,
439 broker_name: &str,
440 ) -> rocketmq_error::RocketMQResult<()> {
441 todo!()
442 }
443
444 async fn fetch_subscribe_message_queues(
445 &mut self,
446 topic: &str,
447 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
448 todo!()
449 }
450}
451
452impl MQAdmin for DefaultMQPushConsumer {
453 fn create_topic(
454 &self,
455 key: &str,
456 new_topic: &str,
457 queue_num: i32,
458 attributes: HashMap<String, String>,
459 ) -> rocketmq_error::RocketMQResult<()> {
460 panic!("This method is not implemented for DefaultMQPushConsumer");
461 }
462
463 fn create_topic_with_flag(
464 &self,
465 key: &str,
466 new_topic: &str,
467 queue_num: i32,
468 topic_sys_flag: i32,
469 attributes: HashMap<String, String>,
470 ) -> rocketmq_error::RocketMQResult<()> {
471 panic!("This method is not implemented for DefaultMQPushConsumer");
472 }
473
474 fn search_offset(
475 &self,
476 mq: &MessageQueue,
477 timestamp: u64,
478 ) -> rocketmq_error::RocketMQResult<i64> {
479 panic!("This method is not implemented for DefaultMQPushConsumer");
480 }
481
482 fn max_offset(&self, mq: &MessageQueue) -> rocketmq_error::RocketMQResult<i64> {
483 panic!("This method is not implemented for DefaultMQPushConsumer");
484 }
485
486 fn min_offset(&self, mq: &MessageQueue) -> rocketmq_error::RocketMQResult<i64> {
487 panic!("This method is not implemented for DefaultMQPushConsumer");
488 }
489
490 fn earliest_msg_store_time(&self, mq: &MessageQueue) -> rocketmq_error::RocketMQResult<u64> {
491 panic!("This method is not implemented for DefaultMQPushConsumer");
492 }
493
494 fn query_message(
495 &self,
496 topic: &str,
497 key: &str,
498 max_num: i32,
499 begin: u64,
500 end: u64,
501 ) -> rocketmq_error::RocketMQResult<QueryResult> {
502 panic!("This method is not implemented for DefaultMQPushConsumer");
503 }
504
505 fn view_message(
506 &self,
507 topic: &str,
508 msg_id: &str,
509 ) -> rocketmq_error::RocketMQResult<MessageExt> {
510 panic!("This method is not implemented for DefaultMQPushConsumer");
511 }
512}
513
514impl MQPushConsumer for DefaultMQPushConsumer {
515 async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
516 let consumer_group = NamespaceUtil::wrap_namespace(
517 self.client_config
518 .get_namespace()
519 .unwrap_or_default()
520 .as_str(),
521 self.consumer_config.consumer_group.as_str(),
522 );
523 self.set_consumer_group(consumer_group.as_str());
524 self.default_mqpush_consumer_impl
525 .as_mut()
526 .unwrap()
527 .start()
528 .await?;
529
530 if self.client_config.enable_trace {
531 let mut dispatcher = AsyncTraceDispatcher::new(
532 self.consumer_config.consumer_group.as_str(),
533 Type::Consume,
534 self.client_config.trace_topic.clone().unwrap().as_str(),
535 self.consumer_config.rpc_hook.clone(),
536 );
537 dispatcher
538 .set_host_consumer(self.default_mqpush_consumer_impl.as_ref().unwrap().clone());
539 dispatcher.set_namespace_v2(self.client_config.namespace_v2.clone());
540 let dispatcher: Arc<Box<dyn TraceDispatcher + Send + Sync>> =
541 Arc::new(Box::new(dispatcher));
542 self.consumer_config.trace_dispatcher = Some(dispatcher.clone());
543 let default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_mut().unwrap();
544 default_mqpush_consumer_impl.register_consume_message_hook(
545 ConsumeMessageTraceHookImpl::new(dispatcher.clone()),
546 );
547 }
548
549 if let Some(ref rpc_hook) = self.consumer_config.trace_dispatcher {
550 unimplemented!("trace hook");
551 }
552
553 Ok(())
554 }
555
556 async fn shutdown(&mut self) {
557 todo!()
558 }
559
560 fn register_message_listener_concurrently_fn<MLCFN>(&mut self, message_listener: MLCFN)
561 where
562 MLCFN: Fn(
563 Vec<MessageExt>,
564 ConsumeConcurrentlyContext,
565 ) -> rocketmq_error::RocketMQResult<ConsumeConcurrentlyStatus>
566 + Send
567 + Sync,
568 {
569 todo!()
570 }
571
572 fn register_message_listener_concurrently<ML>(&mut self, message_listener: ML)
573 where
574 ML: MessageListenerConcurrently + Send + Sync + 'static,
575 {
576 let message_listener = MessageListener {
577 message_listener_concurrently: Some((Some(Arc::new(Box::new(message_listener))), None)),
578 message_listener_orderly: None,
579 };
580 self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
581 self.default_mqpush_consumer_impl
582 .as_mut()
583 .unwrap()
584 .register_message_listener(self.consumer_config.message_listener.clone());
585 }
586
587 async fn register_message_listener_orderly_fn<MLOFN>(&mut self, message_listener: MLOFN)
588 where
589 MLOFN: Fn(
590 Vec<MessageExt>,
591 ConsumeOrderlyContext,
592 ) -> rocketmq_error::RocketMQResult<ConsumeOrderlyStatus>
593 + Send
594 + Sync,
595 {
596 todo!()
597 }
598
599 fn register_message_listener_orderly<ML>(&mut self, message_listener: ML)
600 where
601 ML: MessageListenerOrderly + Send + Sync + 'static,
602 {
603 let message_listener = MessageListener {
604 message_listener_concurrently: None,
605 message_listener_orderly: Some((Some(Arc::new(Box::new(message_listener))), None)),
606 };
607 self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
608 self.default_mqpush_consumer_impl
609 .as_mut()
610 .unwrap()
611 .register_message_listener(self.consumer_config.message_listener.clone());
612 }
613
614 fn subscribe(
615 &mut self,
616 topic: &str,
617 sub_expression: &str,
618 ) -> rocketmq_error::RocketMQResult<()> {
619 let handle = Handle::current();
620 let mut default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.clone();
621 let topic = topic.to_string();
622 let sub_expression = sub_expression.to_string();
623 match thread::spawn(move || {
624 handle.block_on(async move {
625 default_mqpush_consumer_impl
626 .as_mut()
627 .unwrap()
628 .subscribe(topic.into(), sub_expression.into())
629 .await
630 })
631 })
632 .join()
633 {
634 Ok(value) => value,
635 Err(er) => {
636 panic!("Error: {er:?}");
637 }
638 }
639 }
640
641 async fn subscribe_with_selector(
642 &mut self,
643 topic: &str,
644 selector: Option<MessageSelector>,
645 ) -> rocketmq_error::RocketMQResult<()> {
646 todo!()
647 }
648
649 async fn unsubscribe(&mut self, topic: &str) {
650 todo!()
651 }
652
653 async fn suspend(&mut self) {
654 todo!()
655 }
656
657 async fn resume(&mut self) {
658 todo!()
659 }
660}
661
662impl DefaultMQPushConsumer {
663 pub fn builder() -> DefaultMQPushConsumerBuilder {
664 DefaultMQPushConsumerBuilder::default()
665 }
666
667 #[inline]
668 pub fn set_consumer_group(&mut self, consumer_group: impl Into<CheetahString>) {
669 self.consumer_config.consumer_group = consumer_group.into();
670 }
671
672 pub fn new(
673 client_config: ClientConfig,
674 consumer_config: ConsumerConfig,
675 ) -> DefaultMQPushConsumer {
676 let consumer_config = ArcMut::new(consumer_config);
677 let mut default_mqpush_consumer_impl = ArcMut::new(DefaultMQPushConsumerImpl::new(
678 client_config.clone(),
679 consumer_config.clone(),
680 consumer_config.rpc_hook.clone(),
681 ));
682 let wrapper = default_mqpush_consumer_impl.clone();
683 default_mqpush_consumer_impl.set_default_mqpush_consumer_impl(wrapper);
684 DefaultMQPushConsumer {
685 client_config,
686 consumer_config,
687 default_mqpush_consumer_impl: Some(default_mqpush_consumer_impl),
688 }
689 }
690
691 pub fn set_name_server_addr(&mut self, name_server_addr: CheetahString) {
692 self.client_config.namesrv_addr = Some(name_server_addr);
693 self.client_config
694 .namespace_initialized
695 .store(false, std::sync::atomic::Ordering::Release);
696 }
697
698 pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
699 self.consumer_config.consume_from_where = consume_from_where;
700 }
701}