1use rama::{Context, Service};
16use tansu_sans_io::{
17 ApiKey, ErrorCode, ProduceRequest, ProduceResponse,
18 produce_request::{PartitionProduceData, TopicProduceData},
19 produce_response::{PartitionProduceResponse, TopicProduceResponse},
20};
21use tracing::{debug, error, instrument, warn};
22
23use crate::{Error, Result, Storage, Topition};
24
25#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
130pub struct ProduceService;
131
132impl ApiKey for ProduceService {
133 const KEY: i16 = ProduceRequest::KEY;
134}
135
136impl ProduceService {
137 fn error(&self, index: i32, error_code: ErrorCode) -> PartitionProduceResponse {
138 PartitionProduceResponse::default()
139 .index(index)
140 .error_code(error_code.into())
141 .base_offset(-1)
142 .log_append_time_ms(Some(-1))
143 .log_start_offset(Some(0))
144 .record_errors(Some([].into()))
145 .error_message(None)
146 .current_leader(None)
147 }
148
149 #[instrument(skip_all)]
150 async fn partition<G>(
151 &self,
152 ctx: Context<G>,
153 transaction_id: Option<&str>,
154 name: &str,
155 partition: PartitionProduceData,
156 ) -> PartitionProduceResponse
157 where
158 G: Storage,
159 {
160 if let Some(records) = partition.records {
161 let mut base_offset = None;
162
163 for batch in records.batches {
164 let tp = Topition::new(name, partition.index);
165
166 match ctx
167 .state()
168 .produce(transaction_id, &tp, batch)
169 .await
170 .inspect_err(|err| match err {
171 storage_api @ Error::Api(_) => {
172 warn!(?storage_api)
173 }
174 otherwise => error!(?otherwise),
175 }) {
176 Ok(offset) => _ = base_offset.get_or_insert(offset),
177
178 Err(Error::Api(error_code)) => {
179 debug!(?self, ?error_code);
180 return self.error(partition.index, error_code);
181 }
182
183 Err(otherwise) => {
184 warn!(?otherwise);
185 let error = self.error(partition.index, ErrorCode::UnknownServerError);
186 return error;
187 }
188 }
189 }
190
191 if let Some(base_offset) = base_offset {
192 PartitionProduceResponse::default()
193 .index(partition.index)
194 .error_code(ErrorCode::None.into())
195 .base_offset(base_offset)
196 .log_append_time_ms(Some(-1))
197 .log_start_offset(Some(0))
198 .record_errors(Some([].into()))
199 .error_message(None)
200 .current_leader(None)
201 } else {
202 self.error(partition.index, ErrorCode::UnknownServerError)
203 }
204 } else {
205 self.error(partition.index, ErrorCode::UnknownServerError)
206 }
207 }
208
209 #[instrument(skip_all)]
210 async fn topic<G>(
211 &self,
212 ctx: Context<G>,
213 transaction_id: Option<&str>,
214 topic: TopicProduceData,
215 ) -> TopicProduceResponse
216 where
217 G: Storage,
218 {
219 let mut partitions = vec![];
220
221 if let Some(partition_data) = topic.partition_data {
222 for partition in partition_data {
223 partitions.push(
224 self.partition(ctx.clone(), transaction_id, &topic.name, partition)
225 .await,
226 )
227 }
228 }
229
230 TopicProduceResponse::default()
231 .name(topic.name)
232 .partition_responses(Some(partitions))
233 }
234}
235
236impl<G> Service<G, ProduceRequest> for ProduceService
237where
238 G: Storage,
239{
240 type Response = ProduceResponse;
241 type Error = Error;
242
243 #[instrument(skip(ctx, req))]
244 async fn serve(
245 &self,
246 ctx: Context<G>,
247 req: ProduceRequest,
248 ) -> Result<Self::Response, Self::Error> {
249 let mut responses = Vec::with_capacity(
250 req.topic_data
251 .as_ref()
252 .map_or(0, |topic_data| topic_data.len()),
253 );
254
255 if let Some(topics) = req.topic_data {
256 for topic in topics {
257 responses.push(
258 self.topic(ctx.clone(), req.transactional_id.as_deref(), topic)
259 .await,
260 )
261 }
262 }
263
264 Ok(ProduceResponse::default()
265 .responses(Some(responses))
266 .throttle_time_ms(Some(0))
267 .node_endpoints(None))
268 }
269}
270
271#[cfg(all(test, feature = "dynostore"))]
272mod tests {
273 use super::*;
274 use crate::{Error, dynostore::DynoStore, service::init_producer_id::InitProducerIdService};
275 use bytes::Bytes;
276 use object_store::memory::InMemory;
277 use rama::Context;
278 use tansu_sans_io::{
279 ErrorCode, InitProducerIdRequest,
280 record::{
281 Record,
282 deflated::{self, Frame},
283 inflated,
284 },
285 };
286 use tracing::subscriber::DefaultGuard;
287
288 fn init_tracing() -> Result<DefaultGuard> {
289 use std::{fs::File, sync::Arc, thread};
290
291 use tracing::Level;
292 use tracing_subscriber::fmt::format::FmtSpan;
293
294 Ok(tracing::subscriber::set_default(
295 tracing_subscriber::fmt()
296 .with_level(true)
297 .with_line_number(true)
298 .with_thread_names(false)
299 .with_max_level(Level::DEBUG)
300 .with_span_events(FmtSpan::ACTIVE)
301 .with_writer(
302 thread::current()
303 .name()
304 .ok_or(Error::Message(String::from("unnamed thread")))
305 .and_then(|name| {
306 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME")))
307 .map_err(Into::into)
308 })
309 .map(Arc::new)?,
310 )
311 .finish(),
312 ))
313 }
314
315 fn topic_data(
316 topic: &str,
317 index: i32,
318 builder: inflated::Builder,
319 ) -> Result<Option<Vec<TopicProduceData>>> {
320 builder
321 .build()
322 .and_then(deflated::Batch::try_from)
323 .map(|deflated| {
324 let partition_data =
325 PartitionProduceData::default()
326 .index(index)
327 .records(Some(Frame {
328 batches: vec![deflated],
329 }));
330
331 Some(vec![
332 TopicProduceData::default()
333 .name(topic.into())
334 .partition_data(Some(vec![partition_data])),
335 ])
336 })
337 .map_err(Into::into)
338 }
339
340 #[tokio::test]
341 async fn non_txn_idempotent_unknown_producer_id() -> Result<()> {
342 let _guard = init_tracing()?;
343
344 let cluster = "abc";
345 let node = 12321;
346
347 let topic = "pqr";
348 let index = 0;
349
350 let transactional_id = None;
351 let acks = 0;
352 let timeout_ms = 0;
353
354 let storage = DynoStore::new(cluster, node, InMemory::new());
355 let ctx = Context::with_state(storage);
356 let service = ProduceService;
357
358 assert_eq!(
359 ProduceResponse::default()
360 .responses(Some(vec![
361 TopicProduceResponse::default()
362 .name(topic.into())
363 .partition_responses(Some(vec![
364 PartitionProduceResponse::default()
365 .index(index)
366 .error_code(ErrorCode::UnknownProducerId.into())
367 .base_offset(-1)
368 .log_append_time_ms(Some(-1))
369 .log_start_offset(Some(0))
370 .record_errors(Some(vec![]))
371 .error_message(None)
372 .current_leader(None)
373 ]))
374 ]))
375 .throttle_time_ms(Some(0))
376 .node_endpoints(None),
377 service
378 .serve(
379 ctx,
380 ProduceRequest::default()
381 .transactional_id(transactional_id)
382 .acks(acks)
383 .timeout_ms(timeout_ms)
384 .topic_data(topic_data(
385 topic,
386 index,
387 inflated::Batch::builder()
388 .record(
389 Record::builder().value(Bytes::from_static(b"lorem").into())
390 )
391 .producer_id(54345)
392 )?)
393 )
394 .await?
395 );
396
397 Ok(())
398 }
399
400 #[tokio::test]
401 async fn non_txn_idempotent() -> Result<()> {
402 let _guard = init_tracing()?;
403
404 let cluster = "abc";
405 let node = 12321;
406 let topic = "pqr";
407 let index = 0;
408
409 let storage = DynoStore::new(cluster, node, InMemory::new());
410 let ctx = Context::with_state(storage);
411
412 let init_producer_id = InitProducerIdService;
413
414 let producer = init_producer_id
415 .serve(
416 ctx.clone(),
417 InitProducerIdRequest::default()
418 .transactional_id(None)
419 .transaction_timeout_ms(0)
420 .producer_id(Some(-1))
421 .producer_epoch(Some(-1)),
422 )
423 .await?;
424
425 let request = ProduceService;
426
427 let transactional_id = None;
428 let acks = 0;
429 let timeout_ms = 0;
430
431 assert_eq!(
432 ProduceResponse::default()
433 .responses(Some(vec![
434 TopicProduceResponse::default()
435 .name(topic.into())
436 .partition_responses(Some(vec![
437 PartitionProduceResponse::default()
438 .index(index)
439 .error_code(ErrorCode::None.into())
440 .base_offset(0)
441 .log_append_time_ms(Some(-1))
442 .log_start_offset(Some(0))
443 .record_errors(Some(vec![]))
444 .error_message(None)
445 .current_leader(None)
446 ]))
447 ]))
448 .throttle_time_ms(Some(0))
449 .node_endpoints(None),
450 request
451 .serve(
452 ctx.clone(),
453 ProduceRequest::default()
454 .transactional_id(transactional_id.clone())
455 .acks(acks)
456 .timeout_ms(timeout_ms)
457 .topic_data(topic_data(
458 topic,
459 index,
460 inflated::Batch::builder()
461 .record(Record::builder().value(
462 Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
463 ))
464 .producer_id(producer.producer_id)
465 )?)
466 )
467 .await?
468 );
469
470 assert_eq!(
471 ProduceResponse::default()
472 .responses(Some(vec![
473 TopicProduceResponse::default()
474 .name(topic.into())
475 .partition_responses(Some(vec![
476 PartitionProduceResponse::default()
477 .index(index)
478 .error_code(ErrorCode::None.into())
479 .base_offset(1)
480 .log_append_time_ms(Some(-1))
481 .log_start_offset(Some(0))
482 .record_errors(Some(vec![]))
483 .error_message(None)
484 .current_leader(None)
485 ]))
486 ]))
487 .throttle_time_ms(Some(0))
488 .node_endpoints(None),
489 request
490 .serve(
491 ctx.clone(),
492 ProduceRequest::default()
493 .transactional_id(transactional_id.clone())
494 .acks(acks)
495 .timeout_ms(timeout_ms)
496 .topic_data(topic_data(
497 topic,
498 index,
499 inflated::Batch::builder()
500 .record(Record::builder().value(
501 Bytes::from_static(b"consectetur adipiscing elit").into()
502 ))
503 .record(
504 Record::builder()
505 .value(Bytes::from_static(b"sed do eiusmod tempor").into())
506 )
507 .base_sequence(1)
508 .last_offset_delta(1)
509 .producer_id(producer.producer_id)
510 )?)
511 )
512 .await?
513 );
514
515 assert_eq!(
516 ProduceResponse::default()
517 .responses(Some(vec![
518 TopicProduceResponse::default()
519 .name(topic.into())
520 .partition_responses(Some(vec![
521 PartitionProduceResponse::default()
522 .index(index)
523 .error_code(ErrorCode::None.into())
524 .base_offset(3)
525 .log_append_time_ms(Some(-1))
526 .log_start_offset(Some(0))
527 .record_errors(Some(vec![]))
528 .error_message(None)
529 .current_leader(None)
530 ]))
531 ]))
532 .throttle_time_ms(Some(0))
533 .node_endpoints(None),
534 request
535 .serve(
536 ctx,
537 ProduceRequest::default()
538 .transactional_id(transactional_id.clone())
539 .acks(acks)
540 .timeout_ms(timeout_ms)
541 .topic_data(topic_data(
542 topic,
543 index,
544 inflated::Batch::builder()
545 .record(
546 Record::builder()
547 .value(Bytes::from_static(b"incididunt ut labore").into())
548 )
549 .base_sequence(3)
550 .producer_id(producer.producer_id)
551 )?)
552 )
553 .await?
554 );
555
556 Ok(())
557 }
558
559 #[tokio::test]
560 async fn non_txn_idempotent_duplicate_sequence() -> Result<()> {
561 let _guard = init_tracing()?;
562
563 let cluster = "abc";
564 let node = 12321;
565 let topic = "pqr";
566 let index = 0;
567
568 let storage = DynoStore::new(cluster, node, InMemory::new());
569 let ctx = Context::with_state(storage);
570
571 let init_producer_id = InitProducerIdService;
572
573 let producer = init_producer_id
574 .serve(
575 ctx.clone(),
576 InitProducerIdRequest::default()
577 .transactional_id(None)
578 .transaction_timeout_ms(0)
579 .producer_id(Some(-1))
580 .producer_epoch(Some(-1)),
581 )
582 .await?;
583
584 let request = ProduceService;
585
586 let transactional_id = None;
587 let acks = 0;
588 let timeout_ms = 0;
589
590 assert_eq!(
591 ProduceResponse::default()
592 .responses(Some(vec![
593 TopicProduceResponse::default()
594 .name(topic.into())
595 .partition_responses(Some(vec![
596 PartitionProduceResponse::default()
597 .index(index)
598 .error_code(ErrorCode::None.into())
599 .base_offset(0)
600 .log_append_time_ms(Some(-1))
601 .log_start_offset(Some(0))
602 .record_errors(Some(vec![]))
603 .error_message(None)
604 .current_leader(None)
605 ]))
606 ]))
607 .throttle_time_ms(Some(0))
608 .node_endpoints(None),
609 request
610 .serve(
611 ctx.clone(),
612 ProduceRequest::default()
613 .transactional_id(transactional_id.clone())
614 .acks(acks)
615 .timeout_ms(timeout_ms)
616 .topic_data(topic_data(
617 topic,
618 index,
619 inflated::Batch::builder()
620 .record(Record::builder().value(
621 Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
622 ))
623 .producer_id(producer.producer_id)
624 )?)
625 )
626 .await?
627 );
628
629 assert_eq!(
630 ProduceResponse::default()
631 .responses(Some(vec![
632 TopicProduceResponse::default()
633 .name(topic.into())
634 .partition_responses(Some(vec![
635 PartitionProduceResponse::default()
636 .index(index)
637 .error_code(ErrorCode::DuplicateSequenceNumber.into())
638 .base_offset(-1)
639 .log_append_time_ms(Some(-1))
640 .log_start_offset(Some(0))
641 .record_errors(Some(vec![]))
642 .error_message(None)
643 .current_leader(None)
644 ]))
645 ]))
646 .throttle_time_ms(Some(0))
647 .node_endpoints(None),
648 request
649 .serve(
650 ctx,
651 ProduceRequest::default()
652 .transactional_id(transactional_id)
653 .acks(acks)
654 .timeout_ms(timeout_ms)
655 .topic_data(topic_data(
656 topic,
657 index,
658 inflated::Batch::builder()
659 .record(Record::builder().value(
660 Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
661 ))
662 .producer_id(producer.producer_id)
663 )?)
664 )
665 .await?
666 );
667
668 Ok(())
669 }
670
671 #[tokio::test]
672 async fn non_txn_idempotent_sequence_out_of_order() -> Result<()> {
673 let _guard = init_tracing()?;
674
675 let cluster = "abc";
676 let node = 12321;
677 let topic = "pqr";
678 let index = 0;
679
680 let storage = DynoStore::new(cluster, node, InMemory::new());
681 let ctx = Context::with_state(storage);
682
683 let init_producer_id = InitProducerIdService;
684
685 let producer = init_producer_id
686 .serve(
687 ctx.clone(),
688 InitProducerIdRequest::default()
689 .transactional_id(None)
690 .transaction_timeout_ms(0)
691 .producer_id(Some(-1))
692 .producer_epoch(Some(-1)),
693 )
694 .await?;
695
696 let request = ProduceService;
697
698 let transactional_id = None;
699 let acks = 0;
700 let timeout_ms = 0;
701
702 assert_eq!(
703 ProduceResponse::default()
704 .responses(Some(vec![
705 TopicProduceResponse::default()
706 .name(topic.into())
707 .partition_responses(Some(vec![
708 PartitionProduceResponse::default()
709 .index(index)
710 .error_code(ErrorCode::None.into())
711 .base_offset(0)
712 .log_append_time_ms(Some(-1))
713 .log_start_offset(Some(0))
714 .record_errors(Some(vec![]))
715 .error_message(None)
716 .current_leader(None)
717 ]))
718 ]))
719 .throttle_time_ms(Some(0))
720 .node_endpoints(None),
721 request
722 .serve(
723 ctx.clone(),
724 ProduceRequest::default()
725 .transactional_id(transactional_id.clone())
726 .acks(acks)
727 .timeout_ms(timeout_ms)
728 .topic_data(topic_data(
729 topic,
730 index,
731 inflated::Batch::builder()
732 .record(Record::builder().value(
733 Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
734 ))
735 .producer_id(producer.producer_id)
736 )?)
737 )
738 .await?
739 );
740
741 assert_eq!(
742 ProduceResponse::default()
743 .responses(Some(vec![
744 TopicProduceResponse::default()
745 .name(topic.into())
746 .partition_responses(Some(vec![
747 PartitionProduceResponse::default()
748 .index(index)
749 .error_code(ErrorCode::OutOfOrderSequenceNumber.into())
750 .base_offset(-1)
751 .log_append_time_ms(Some(-1))
752 .log_start_offset(Some(0))
753 .record_errors(Some(vec![]))
754 .error_message(None)
755 .current_leader(None)
756 ]))
757 ]))
758 .throttle_time_ms(Some(0))
759 .node_endpoints(None),
760 request
761 .serve(
762 ctx,
763 ProduceRequest::default()
764 .transactional_id(transactional_id)
765 .acks(acks)
766 .timeout_ms(timeout_ms)
767 .topic_data(topic_data(
768 topic,
769 index,
770 inflated::Batch::builder()
771 .record(Record::builder().value(
772 Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
773 ))
774 .base_sequence(2)
775 .producer_id(producer.producer_id)
776 )?)
777 )
778 .await?
779 );
780
781 Ok(())
782 }
783}