Skip to main content

tansu_storage/service/
produce.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17    ApiKey, ErrorCode, ProduceRequest, ProduceResponse,
18    produce_request::{PartitionProduceData, TopicProduceData},
19    produce_response::{PartitionProduceResponse, TopicProduceResponse},
20};
21use tracing::{debug, error, instrument, warn};
22
23use crate::{Error, Result, Storage, Topition};
24
25/// A [`Service`] using [`Storage`] as [`Context`] taking [`ProduceRequest`] returning [`ProduceResponse`].
26/// ```
27/// use bytes::Bytes;
28/// use rama::{Context, Layer as _, Service as _, layer::MapStateLayer};
29/// use tansu_sans_io::{
30///     CreateTopicsRequest, ErrorCode, ProduceRequest,
31///     create_topics_request::CreatableTopic,
32///     produce_request::{PartitionProduceData, TopicProduceData},
33///     record::{Record, deflated::Frame, inflated},
34/// };
35/// use tansu_storage::{CreateTopicsService, Error, ProduceService, StorageContainer};
36/// use url::Url;
37///
38/// # #[tokio::main]
39/// # async fn main() -> Result<(), Error> {
40/// const CLUSTER_ID: &str = "tansu";
41/// const NODE_ID: i32 = 111;
42/// const HOST: &str = "localhost";
43/// const PORT: i32 = 9092;
44///
45/// let storage = StorageContainer::builder()
46///     .cluster_id(CLUSTER_ID)
47///     .node_id(NODE_ID)
48///     .advertised_listener(Url::parse(&format!("tcp://{HOST}:{PORT}"))?)
49///     .storage(Url::parse("memory://tansu/")?)
50///     .build()
51///     .await?;
52///
53/// let create_topic = {
54///     let storage = storage.clone();
55///     MapStateLayer::new(|_| storage).into_layer(CreateTopicsService)
56/// };
57///
58/// let name = "abcba";
59///
60/// let response = create_topic
61///     .serve(
62///         Context::default(),
63///         CreateTopicsRequest::default()
64///             .topics(Some(vec![
65///                 CreatableTopic::default()
66///                     .name(name.into())
67///                     .num_partitions(5)
68///                     .replication_factor(3)
69///                     .assignments(Some([].into()))
70///                     .configs(Some([].into())),
71///             ]))
72///             .validate_only(Some(false)),
73///     )
74///     .await?;
75///
76/// let topics = response.topics.unwrap_or_default();
77/// assert_eq!(1, topics.len());
78/// assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
79///
80/// let produce = {
81///     let storage = storage.clone();
82///     MapStateLayer::new(|_| storage).into_layer(ProduceService)
83/// };
84///
85/// let partition = 0;
86///
87/// let response = produce
88///     .serve(
89///         Context::default(),
90///         ProduceRequest::default().topic_data(Some(
91///             [TopicProduceData::default()
92///                 .name(name.into())
93///                 .partition_data(Some(
94///                     [PartitionProduceData::default()
95///                         .index(partition)
96///                         .records(Some(Frame {
97///                             batches: vec![
98///                                 inflated::Batch::builder()
99///                                     .record(
100///                                         Record::builder().value(
101///                                             Bytes::from_static(
102///                                                 b"Lorem ipsum dolor sit amet",
103///                                             )
104///                                             .into(),
105///                                         ),
106///                                     )
107///                                     .build()
108///                                     .and_then(TryInto::try_into)?,
109///                             ],
110///                         }))]
111///                     .into(),
112///                 ))]
113///             .into(),
114///         )),
115///     )
116///     .await?;
117///
118/// let topics = response.responses.as_deref().unwrap_or_default();
119/// assert_eq!(1, topics.len());
120/// let partitions = topics[0].partition_responses.as_deref().unwrap_or_default();
121/// assert_eq!(1, partitions.len());
122/// assert_eq!(
123///     ErrorCode::None,
124///     ErrorCode::try_from(partitions[0].error_code)?
125/// );
126/// # Ok(())
127/// # }
128/// ```
129#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
130pub struct ProduceService;
131
132impl ApiKey for ProduceService {
133    const KEY: i16 = ProduceRequest::KEY;
134}
135
136impl ProduceService {
137    fn error(&self, index: i32, error_code: ErrorCode) -> PartitionProduceResponse {
138        PartitionProduceResponse::default()
139            .index(index)
140            .error_code(error_code.into())
141            .base_offset(-1)
142            .log_append_time_ms(Some(-1))
143            .log_start_offset(Some(0))
144            .record_errors(Some([].into()))
145            .error_message(None)
146            .current_leader(None)
147    }
148
149    #[instrument(skip_all)]
150    async fn partition<G>(
151        &self,
152        ctx: Context<G>,
153        transaction_id: Option<&str>,
154        name: &str,
155        partition: PartitionProduceData,
156    ) -> PartitionProduceResponse
157    where
158        G: Storage,
159    {
160        if let Some(records) = partition.records {
161            let mut base_offset = None;
162
163            for batch in records.batches {
164                let tp = Topition::new(name, partition.index);
165
166                match ctx
167                    .state()
168                    .produce(transaction_id, &tp, batch)
169                    .await
170                    .inspect_err(|err| match err {
171                        storage_api @ Error::Api(_) => {
172                            warn!(?storage_api)
173                        }
174                        otherwise => error!(?otherwise),
175                    }) {
176                    Ok(offset) => _ = base_offset.get_or_insert(offset),
177
178                    Err(Error::Api(error_code)) => {
179                        debug!(?self, ?error_code);
180                        return self.error(partition.index, error_code);
181                    }
182
183                    Err(otherwise) => {
184                        warn!(?otherwise);
185                        let error = self.error(partition.index, ErrorCode::UnknownServerError);
186                        return error;
187                    }
188                }
189            }
190
191            if let Some(base_offset) = base_offset {
192                PartitionProduceResponse::default()
193                    .index(partition.index)
194                    .error_code(ErrorCode::None.into())
195                    .base_offset(base_offset)
196                    .log_append_time_ms(Some(-1))
197                    .log_start_offset(Some(0))
198                    .record_errors(Some([].into()))
199                    .error_message(None)
200                    .current_leader(None)
201            } else {
202                self.error(partition.index, ErrorCode::UnknownServerError)
203            }
204        } else {
205            self.error(partition.index, ErrorCode::UnknownServerError)
206        }
207    }
208
209    #[instrument(skip_all)]
210    async fn topic<G>(
211        &self,
212        ctx: Context<G>,
213        transaction_id: Option<&str>,
214        topic: TopicProduceData,
215    ) -> TopicProduceResponse
216    where
217        G: Storage,
218    {
219        let mut partitions = vec![];
220
221        if let Some(partition_data) = topic.partition_data {
222            for partition in partition_data {
223                partitions.push(
224                    self.partition(ctx.clone(), transaction_id, &topic.name, partition)
225                        .await,
226                )
227            }
228        }
229
230        TopicProduceResponse::default()
231            .name(topic.name)
232            .partition_responses(Some(partitions))
233    }
234}
235
236impl<G> Service<G, ProduceRequest> for ProduceService
237where
238    G: Storage,
239{
240    type Response = ProduceResponse;
241    type Error = Error;
242
243    #[instrument(skip(ctx, req))]
244    async fn serve(
245        &self,
246        ctx: Context<G>,
247        req: ProduceRequest,
248    ) -> Result<Self::Response, Self::Error> {
249        let mut responses = Vec::with_capacity(
250            req.topic_data
251                .as_ref()
252                .map_or(0, |topic_data| topic_data.len()),
253        );
254
255        if let Some(topics) = req.topic_data {
256            for topic in topics {
257                responses.push(
258                    self.topic(ctx.clone(), req.transactional_id.as_deref(), topic)
259                        .await,
260                )
261            }
262        }
263
264        Ok(ProduceResponse::default()
265            .responses(Some(responses))
266            .throttle_time_ms(Some(0))
267            .node_endpoints(None))
268    }
269}
270
271#[cfg(all(test, feature = "dynostore"))]
272mod tests {
273    use super::*;
274    use crate::{Error, dynostore::DynoStore, service::init_producer_id::InitProducerIdService};
275    use bytes::Bytes;
276    use object_store::memory::InMemory;
277    use rama::Context;
278    use tansu_sans_io::{
279        ErrorCode, InitProducerIdRequest,
280        record::{
281            Record,
282            deflated::{self, Frame},
283            inflated,
284        },
285    };
286    use tracing::subscriber::DefaultGuard;
287
288    fn init_tracing() -> Result<DefaultGuard> {
289        use std::{fs::File, sync::Arc, thread};
290
291        use tracing::Level;
292        use tracing_subscriber::fmt::format::FmtSpan;
293
294        Ok(tracing::subscriber::set_default(
295            tracing_subscriber::fmt()
296                .with_level(true)
297                .with_line_number(true)
298                .with_thread_names(false)
299                .with_max_level(Level::DEBUG)
300                .with_span_events(FmtSpan::ACTIVE)
301                .with_writer(
302                    thread::current()
303                        .name()
304                        .ok_or(Error::Message(String::from("unnamed thread")))
305                        .and_then(|name| {
306                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME")))
307                                .map_err(Into::into)
308                        })
309                        .map(Arc::new)?,
310                )
311                .finish(),
312        ))
313    }
314
315    fn topic_data(
316        topic: &str,
317        index: i32,
318        builder: inflated::Builder,
319    ) -> Result<Option<Vec<TopicProduceData>>> {
320        builder
321            .build()
322            .and_then(deflated::Batch::try_from)
323            .map(|deflated| {
324                let partition_data =
325                    PartitionProduceData::default()
326                        .index(index)
327                        .records(Some(Frame {
328                            batches: vec![deflated],
329                        }));
330
331                Some(vec![
332                    TopicProduceData::default()
333                        .name(topic.into())
334                        .partition_data(Some(vec![partition_data])),
335                ])
336            })
337            .map_err(Into::into)
338    }
339
340    #[tokio::test]
341    async fn non_txn_idempotent_unknown_producer_id() -> Result<()> {
342        let _guard = init_tracing()?;
343
344        let cluster = "abc";
345        let node = 12321;
346
347        let topic = "pqr";
348        let index = 0;
349
350        let transactional_id = None;
351        let acks = 0;
352        let timeout_ms = 0;
353
354        let storage = DynoStore::new(cluster, node, InMemory::new());
355        let ctx = Context::with_state(storage);
356        let service = ProduceService;
357
358        assert_eq!(
359            ProduceResponse::default()
360                .responses(Some(vec![
361                    TopicProduceResponse::default()
362                        .name(topic.into())
363                        .partition_responses(Some(vec![
364                            PartitionProduceResponse::default()
365                                .index(index)
366                                .error_code(ErrorCode::UnknownProducerId.into())
367                                .base_offset(-1)
368                                .log_append_time_ms(Some(-1))
369                                .log_start_offset(Some(0))
370                                .record_errors(Some(vec![]))
371                                .error_message(None)
372                                .current_leader(None)
373                        ]))
374                ]))
375                .throttle_time_ms(Some(0))
376                .node_endpoints(None),
377            service
378                .serve(
379                    ctx,
380                    ProduceRequest::default()
381                        .transactional_id(transactional_id)
382                        .acks(acks)
383                        .timeout_ms(timeout_ms)
384                        .topic_data(topic_data(
385                            topic,
386                            index,
387                            inflated::Batch::builder()
388                                .record(
389                                    Record::builder().value(Bytes::from_static(b"lorem").into())
390                                )
391                                .producer_id(54345)
392                        )?)
393                )
394                .await?
395        );
396
397        Ok(())
398    }
399
400    #[tokio::test]
401    async fn non_txn_idempotent() -> Result<()> {
402        let _guard = init_tracing()?;
403
404        let cluster = "abc";
405        let node = 12321;
406        let topic = "pqr";
407        let index = 0;
408
409        let storage = DynoStore::new(cluster, node, InMemory::new());
410        let ctx = Context::with_state(storage);
411
412        let init_producer_id = InitProducerIdService;
413
414        let producer = init_producer_id
415            .serve(
416                ctx.clone(),
417                InitProducerIdRequest::default()
418                    .transactional_id(None)
419                    .transaction_timeout_ms(0)
420                    .producer_id(Some(-1))
421                    .producer_epoch(Some(-1)),
422            )
423            .await?;
424
425        let request = ProduceService;
426
427        let transactional_id = None;
428        let acks = 0;
429        let timeout_ms = 0;
430
431        assert_eq!(
432            ProduceResponse::default()
433                .responses(Some(vec![
434                    TopicProduceResponse::default()
435                        .name(topic.into())
436                        .partition_responses(Some(vec![
437                            PartitionProduceResponse::default()
438                                .index(index)
439                                .error_code(ErrorCode::None.into())
440                                .base_offset(0)
441                                .log_append_time_ms(Some(-1))
442                                .log_start_offset(Some(0))
443                                .record_errors(Some(vec![]))
444                                .error_message(None)
445                                .current_leader(None)
446                        ]))
447                ]))
448                .throttle_time_ms(Some(0))
449                .node_endpoints(None),
450            request
451                .serve(
452                    ctx.clone(),
453                    ProduceRequest::default()
454                        .transactional_id(transactional_id.clone())
455                        .acks(acks)
456                        .timeout_ms(timeout_ms)
457                        .topic_data(topic_data(
458                            topic,
459                            index,
460                            inflated::Batch::builder()
461                                .record(Record::builder().value(
462                                    Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
463                                ))
464                                .producer_id(producer.producer_id)
465                        )?)
466                )
467                .await?
468        );
469
470        assert_eq!(
471            ProduceResponse::default()
472                .responses(Some(vec![
473                    TopicProduceResponse::default()
474                        .name(topic.into())
475                        .partition_responses(Some(vec![
476                            PartitionProduceResponse::default()
477                                .index(index)
478                                .error_code(ErrorCode::None.into())
479                                .base_offset(1)
480                                .log_append_time_ms(Some(-1))
481                                .log_start_offset(Some(0))
482                                .record_errors(Some(vec![]))
483                                .error_message(None)
484                                .current_leader(None)
485                        ]))
486                ]))
487                .throttle_time_ms(Some(0))
488                .node_endpoints(None),
489            request
490                .serve(
491                    ctx.clone(),
492                    ProduceRequest::default()
493                        .transactional_id(transactional_id.clone())
494                        .acks(acks)
495                        .timeout_ms(timeout_ms)
496                        .topic_data(topic_data(
497                            topic,
498                            index,
499                            inflated::Batch::builder()
500                                .record(Record::builder().value(
501                                    Bytes::from_static(b"consectetur adipiscing elit").into()
502                                ))
503                                .record(
504                                    Record::builder()
505                                        .value(Bytes::from_static(b"sed do eiusmod tempor").into())
506                                )
507                                .base_sequence(1)
508                                .last_offset_delta(1)
509                                .producer_id(producer.producer_id)
510                        )?)
511                )
512                .await?
513        );
514
515        assert_eq!(
516            ProduceResponse::default()
517                .responses(Some(vec![
518                    TopicProduceResponse::default()
519                        .name(topic.into())
520                        .partition_responses(Some(vec![
521                            PartitionProduceResponse::default()
522                                .index(index)
523                                .error_code(ErrorCode::None.into())
524                                .base_offset(3)
525                                .log_append_time_ms(Some(-1))
526                                .log_start_offset(Some(0))
527                                .record_errors(Some(vec![]))
528                                .error_message(None)
529                                .current_leader(None)
530                        ]))
531                ]))
532                .throttle_time_ms(Some(0))
533                .node_endpoints(None),
534            request
535                .serve(
536                    ctx,
537                    ProduceRequest::default()
538                        .transactional_id(transactional_id.clone())
539                        .acks(acks)
540                        .timeout_ms(timeout_ms)
541                        .topic_data(topic_data(
542                            topic,
543                            index,
544                            inflated::Batch::builder()
545                                .record(
546                                    Record::builder()
547                                        .value(Bytes::from_static(b"incididunt ut labore").into())
548                                )
549                                .base_sequence(3)
550                                .producer_id(producer.producer_id)
551                        )?)
552                )
553                .await?
554        );
555
556        Ok(())
557    }
558
559    #[tokio::test]
560    async fn non_txn_idempotent_duplicate_sequence() -> Result<()> {
561        let _guard = init_tracing()?;
562
563        let cluster = "abc";
564        let node = 12321;
565        let topic = "pqr";
566        let index = 0;
567
568        let storage = DynoStore::new(cluster, node, InMemory::new());
569        let ctx = Context::with_state(storage);
570
571        let init_producer_id = InitProducerIdService;
572
573        let producer = init_producer_id
574            .serve(
575                ctx.clone(),
576                InitProducerIdRequest::default()
577                    .transactional_id(None)
578                    .transaction_timeout_ms(0)
579                    .producer_id(Some(-1))
580                    .producer_epoch(Some(-1)),
581            )
582            .await?;
583
584        let request = ProduceService;
585
586        let transactional_id = None;
587        let acks = 0;
588        let timeout_ms = 0;
589
590        assert_eq!(
591            ProduceResponse::default()
592                .responses(Some(vec![
593                    TopicProduceResponse::default()
594                        .name(topic.into())
595                        .partition_responses(Some(vec![
596                            PartitionProduceResponse::default()
597                                .index(index)
598                                .error_code(ErrorCode::None.into())
599                                .base_offset(0)
600                                .log_append_time_ms(Some(-1))
601                                .log_start_offset(Some(0))
602                                .record_errors(Some(vec![]))
603                                .error_message(None)
604                                .current_leader(None)
605                        ]))
606                ]))
607                .throttle_time_ms(Some(0))
608                .node_endpoints(None),
609            request
610                .serve(
611                    ctx.clone(),
612                    ProduceRequest::default()
613                        .transactional_id(transactional_id.clone())
614                        .acks(acks)
615                        .timeout_ms(timeout_ms)
616                        .topic_data(topic_data(
617                            topic,
618                            index,
619                            inflated::Batch::builder()
620                                .record(Record::builder().value(
621                                    Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
622                                ))
623                                .producer_id(producer.producer_id)
624                        )?)
625                )
626                .await?
627        );
628
629        assert_eq!(
630            ProduceResponse::default()
631                .responses(Some(vec![
632                    TopicProduceResponse::default()
633                        .name(topic.into())
634                        .partition_responses(Some(vec![
635                            PartitionProduceResponse::default()
636                                .index(index)
637                                .error_code(ErrorCode::DuplicateSequenceNumber.into())
638                                .base_offset(-1)
639                                .log_append_time_ms(Some(-1))
640                                .log_start_offset(Some(0))
641                                .record_errors(Some(vec![]))
642                                .error_message(None)
643                                .current_leader(None)
644                        ]))
645                ]))
646                .throttle_time_ms(Some(0))
647                .node_endpoints(None),
648            request
649                .serve(
650                    ctx,
651                    ProduceRequest::default()
652                        .transactional_id(transactional_id)
653                        .acks(acks)
654                        .timeout_ms(timeout_ms)
655                        .topic_data(topic_data(
656                            topic,
657                            index,
658                            inflated::Batch::builder()
659                                .record(Record::builder().value(
660                                    Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
661                                ))
662                                .producer_id(producer.producer_id)
663                        )?)
664                )
665                .await?
666        );
667
668        Ok(())
669    }
670
671    #[tokio::test]
672    async fn non_txn_idempotent_sequence_out_of_order() -> Result<()> {
673        let _guard = init_tracing()?;
674
675        let cluster = "abc";
676        let node = 12321;
677        let topic = "pqr";
678        let index = 0;
679
680        let storage = DynoStore::new(cluster, node, InMemory::new());
681        let ctx = Context::with_state(storage);
682
683        let init_producer_id = InitProducerIdService;
684
685        let producer = init_producer_id
686            .serve(
687                ctx.clone(),
688                InitProducerIdRequest::default()
689                    .transactional_id(None)
690                    .transaction_timeout_ms(0)
691                    .producer_id(Some(-1))
692                    .producer_epoch(Some(-1)),
693            )
694            .await?;
695
696        let request = ProduceService;
697
698        let transactional_id = None;
699        let acks = 0;
700        let timeout_ms = 0;
701
702        assert_eq!(
703            ProduceResponse::default()
704                .responses(Some(vec![
705                    TopicProduceResponse::default()
706                        .name(topic.into())
707                        .partition_responses(Some(vec![
708                            PartitionProduceResponse::default()
709                                .index(index)
710                                .error_code(ErrorCode::None.into())
711                                .base_offset(0)
712                                .log_append_time_ms(Some(-1))
713                                .log_start_offset(Some(0))
714                                .record_errors(Some(vec![]))
715                                .error_message(None)
716                                .current_leader(None)
717                        ]))
718                ]))
719                .throttle_time_ms(Some(0))
720                .node_endpoints(None),
721            request
722                .serve(
723                    ctx.clone(),
724                    ProduceRequest::default()
725                        .transactional_id(transactional_id.clone())
726                        .acks(acks)
727                        .timeout_ms(timeout_ms)
728                        .topic_data(topic_data(
729                            topic,
730                            index,
731                            inflated::Batch::builder()
732                                .record(Record::builder().value(
733                                    Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
734                                ))
735                                .producer_id(producer.producer_id)
736                        )?)
737                )
738                .await?
739        );
740
741        assert_eq!(
742            ProduceResponse::default()
743                .responses(Some(vec![
744                    TopicProduceResponse::default()
745                        .name(topic.into())
746                        .partition_responses(Some(vec![
747                            PartitionProduceResponse::default()
748                                .index(index)
749                                .error_code(ErrorCode::OutOfOrderSequenceNumber.into())
750                                .base_offset(-1)
751                                .log_append_time_ms(Some(-1))
752                                .log_start_offset(Some(0))
753                                .record_errors(Some(vec![]))
754                                .error_message(None)
755                                .current_leader(None)
756                        ]))
757                ]))
758                .throttle_time_ms(Some(0))
759                .node_endpoints(None),
760            request
761                .serve(
762                    ctx,
763                    ProduceRequest::default()
764                        .transactional_id(transactional_id)
765                        .acks(acks)
766                        .timeout_ms(timeout_ms)
767                        .topic_data(topic_data(
768                            topic,
769                            index,
770                            inflated::Batch::builder()
771                                .record(Record::builder().value(
772                                    Bytes::from_static(b"Lorem ipsum dolor sit amet").into()
773                                ))
774                                .base_sequence(2)
775                                .producer_id(producer.producer_id)
776                        )?)
777                )
778                .await?
779        );
780
781        Ok(())
782    }
783}