1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
use std::sync::Arc;
use std::collections::HashMap;
use tracing::instrument;
use siphasher::sip::SipHasher;
use async_lock::Mutex;

use dataplane::ReplicaKey;
use dataplane::produce::DefaultProduceRequest;
use dataplane::produce::DefaultPartitionRequest;
use dataplane::produce::DefaultTopicRequest;
use dataplane::batch::{Batch, MemoryRecords};
use dataplane::record::Record;
pub use dataplane::record::{RecordKey, RecordData};

use crate::FluvioError;
use crate::spu::SpuPool;
use fluvio_types::{SpuId, PartitionId};
use crate::sync::StoreContext;
use crate::metadata::partition::PartitionSpec;

/// An interface for producing events to a particular topic
///
/// A `TopicProducer` allows you to send events to the specific
/// topic it was initialized for. Once you have a `TopicProducer`,
/// you can send events to the topic, choosing which partition
/// each event should be delivered to.
pub struct TopicProducer {
    topic: String,
    pool: Arc<SpuPool>,
    partitioner: Arc<Mutex<dyn Partitioner + Send + Sync>>,
}

impl TopicProducer {
    pub(crate) fn new(topic: String, pool: Arc<SpuPool>) -> Self {
        let config = PartitionerConfig { partition_count: 1 };
        let partitioner = Arc::new(Mutex::new(SiphashRoundRobinPartitioner::new(config)));
        Self {
            topic,
            pool,
            partitioner,
        }
    }

    /// Sends a key/value record to this producer's Topic.
    ///
    /// The partition that the record will be sent to is derived from the Key.
    ///
    /// # Example
    ///
    /// ```
    /// # use fluvio::{TopicProducer, FluvioError};
    /// # async fn example(producer: &TopicProducer) -> Result<(), FluvioError> {
    /// producer.send("Key", "Value").await?;
    /// # Ok(())
    /// # }
    /// ```
    #[instrument(
        skip(self, key, value),
        fields(topic = %self.topic),
    )]
    pub async fn send<K, V>(&self, key: K, value: V) -> Result<(), FluvioError>
    where
        K: Into<RecordKey>,
        V: Into<RecordData>,
    {
        let record_key = key.into();
        let record_value = value.into();
        self.send_all(Some((record_key, record_value))).await?;
        Ok(())
    }

    #[instrument(
        skip(self, records),
        fields(topic = %self.topic),
    )]
    pub async fn send_all<K, V, I>(&self, records: I) -> Result<(), FluvioError>
    where
        K: Into<RecordKey>,
        V: Into<RecordData>,
        I: IntoIterator<Item = (K, V)>,
    {
        let topics = self.pool.metadata.topics();
        let topic_spec = topics
            .lookup_by_key(&self.topic)
            .await?
            .ok_or_else(|| FluvioError::TopicNotFound(self.topic.to_string()))?
            .spec;
        let partition_count = topic_spec.partitions();
        let partition_config = PartitionerConfig { partition_count };

        let entries = records
            .into_iter()
            .map::<(RecordKey, RecordData), _>(|(k, v)| (k.into(), v.into()))
            .map(Record::from);

        // Calculate the partition for each entry
        // Use a block scope to ensure we drop the partitioner lock
        let records_by_partition = {
            let mut iter = Vec::new();
            let mut partitioner = self.partitioner.lock().await;
            partitioner.update_config(partition_config);

            for record in entries {
                let key = record.key.as_ref().map(|k| k.as_ref());
                let value = record.value.as_ref();
                let partition = partitioner.partition(key, value);
                iter.push((partition, record));
            }
            iter
        };

        // Group all of the records by the partitions they belong to, then
        // group all of the partitions by the SpuId that leads that partition
        let partitions_by_spu = group_by_spu(
            &self.topic,
            self.pool.metadata.partitions(),
            records_by_partition,
        )
        .await?;

        // Create one request per SPU leader
        let requests = assemble_requests(&self.topic, partitions_by_spu);

        for (leader, request) in requests {
            let spu_client = self.pool.create_serial_socket_from_leader(leader).await?;
            spu_client.send_receive(request).await?;
        }

        Ok(())
    }
}

async fn group_by_spu(
    topic: &str,
    partitions: &StoreContext<PartitionSpec>,
    records_by_partition: Vec<(PartitionId, Record)>,
) -> Result<HashMap<SpuId, HashMap<PartitionId, MemoryRecords>>, FluvioError> {
    let mut map: HashMap<SpuId, HashMap<i32, MemoryRecords>> = HashMap::new();
    for (partition, record) in records_by_partition {
        let replica_key = ReplicaKey::new(topic, partition);
        let partition_spec = partitions
            .lookup_by_key(&replica_key)
            .await?
            .ok_or_else(|| FluvioError::PartitionNotFound(topic.to_string(), partition))?
            .spec;
        let leader = partition_spec.leader;
        map.entry(leader)
            .or_insert_with(HashMap::new)
            .entry(partition)
            .or_insert_with(Vec::new)
            .push(record);
    }

    Ok(map)
}

fn assemble_requests(
    topic: &str,
    partitions_by_spu: HashMap<SpuId, HashMap<PartitionId, MemoryRecords>>,
) -> Vec<(SpuId, DefaultProduceRequest)> {
    let mut requests: Vec<(SpuId, DefaultProduceRequest)> =
        Vec::with_capacity(partitions_by_spu.len());

    for (leader, partitions) in partitions_by_spu {
        let mut request = DefaultProduceRequest::default();

        let mut topic_request = DefaultTopicRequest {
            name: topic.to_string(),
            ..Default::default()
        };

        for (partition, records) in partitions {
            let mut partition_request = DefaultPartitionRequest {
                partition_index: partition,
                ..Default::default()
            };
            partition_request.records.batches.push(Batch::from(records));
            topic_request.partitions.push(partition_request);
        }

        request.acks = 1;
        request.timeout_ms = 1500;
        request.topics.push(topic_request);
        requests.push((leader, request));
    }

    requests
}

/// A trait for defining a partitioning strategy for key/value records.
///
/// A Partitioner is given a slice of potential keys, and the number of
/// partitions in the current Topic. It must map each key from the input
/// slice into a partition stored at the same index in the output Vec.
///
/// It is up to the implementor to decide how the keys get mapped to
/// partitions. This includes deciding what partition to assign to records
/// with no keys (represented by `None` values in the keys slice).
///
/// See [`SiphashRoundRobinPartitioner`] for a reference implementation.
trait Partitioner {
    fn partition(&mut self, key: Option<&[u8]>, value: &[u8]) -> PartitionId;
    fn update_config(&mut self, config: PartitionerConfig);
}

struct PartitionerConfig {
    partition_count: i32,
}

/// A [`Partitioner`] which combines hashing and round-robin partition assignment
///
/// - Records with keys get their keys hashed with siphash
/// - Records without keys get assigned to partitions using round-robin
struct SiphashRoundRobinPartitioner {
    index: PartitionId,
    config: PartitionerConfig,
}

impl SiphashRoundRobinPartitioner {
    pub fn new(config: PartitionerConfig) -> Self {
        Self { index: 0, config }
    }
}

impl Partitioner for SiphashRoundRobinPartitioner {
    fn partition(&mut self, maybe_key: Option<&[u8]>, _value: &[u8]) -> i32 {
        match maybe_key {
            Some(key) => partition_siphash(key, self.config.partition_count),
            None => {
                let partition = self.index;
                self.index = (self.index + 1) % self.config.partition_count;
                partition
            }
        }
    }

    fn update_config(&mut self, config: PartitionerConfig) {
        self.config = config;
    }
}

fn partition_siphash(key: &[u8], partition_count: i32) -> i32 {
    use std::hash::{Hash, Hasher};
    use std::convert::TryFrom;

    assert!(partition_count >= 0, "Partition must not be less than zero");
    let mut hasher = SipHasher::new();
    key.hash(&mut hasher);
    let hashed = hasher.finish();

    i32::try_from(hashed % partition_count as u64).unwrap()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::metadata::store::MetadataStoreObject;

    /// Ensure that feeding keyless records one-at-a-time does not assign the same partition
    #[test]
    fn test_round_robin_individual() {
        let config = PartitionerConfig { partition_count: 3 };
        let mut partitioner = SiphashRoundRobinPartitioner::new(config);

        let key1_partition = partitioner.partition(None, &[]);
        assert_eq!(key1_partition, 0);
        let key2_partition = partitioner.partition(None, &[]);
        assert_eq!(key2_partition, 1);
        let key3_partition = partitioner.partition(None, &[]);
        assert_eq!(key3_partition, 2);
        let key4_partition = partitioner.partition(None, &[]);
        assert_eq!(key4_partition, 0);
        let key5_partition = partitioner.partition(None, &[]);
        assert_eq!(key5_partition, 1);
        let key6_partition = partitioner.partition(None, &[]);
        assert_eq!(key6_partition, 2);
    }

    #[fluvio_future::test]
    async fn test_group_by_spu() {
        let partitions = StoreContext::new();
        let partition_specs: Vec<_> = vec![
            (ReplicaKey::new("TOPIC", 0), PartitionSpec::new(0, vec![])), // Partition 0
            (ReplicaKey::new("TOPIC", 1), PartitionSpec::new(0, vec![])), // Partition 1
            (ReplicaKey::new("TOPIC", 2), PartitionSpec::new(1, vec![])), // Partition 2
        ]
        .into_iter()
        .map(|(key, spec)| MetadataStoreObject::with_spec(key, spec))
        .collect();
        partitions.store().sync_all(partition_specs).await;
        let records_by_partition = vec![
            (0, Record::new("A")),
            (1, Record::new("B")),
            (2, Record::new("C")),
            (0, Record::new("D")),
            (1, Record::new("E")),
            (2, Record::new("F")),
        ];

        let grouped = group_by_spu("TOPIC", &partitions, records_by_partition)
            .await
            .unwrap();

        // SPU 0 should have partitions 0 and 1, but not 2
        let spu_0 = grouped.get(&0).unwrap();
        let partition_0 = spu_0.get(&0).unwrap();

        assert!(
            partition_0
                .iter()
                .any(|record| record.value.as_ref() == b"A"),
            "SPU 0/Partition 0 should contain record A",
        );
        assert!(
            partition_0
                .iter()
                .any(|record| record.value.as_ref() == b"D"),
            "SPU 0/Partition 0 should contain record D",
        );

        let partition_1 = spu_0.get(&1).unwrap();
        assert!(
            partition_1
                .iter()
                .any(|record| record.value.as_ref() == b"B"),
            "SPU 0/Partition 1 should contain record B",
        );
        assert!(
            partition_1
                .iter()
                .any(|record| record.value.as_ref() == b"E"),
            "SPU 0/Partition 1 should contain record E",
        );

        assert!(
            spu_0.get(&2).is_none(),
            "SPU 0 should not contain Partition 2",
        );

        let spu_1 = grouped.get(&1).unwrap();
        let partition_2 = spu_1.get(&2).unwrap();

        assert!(
            spu_1.get(&0).is_none(),
            "SPU 1 should not contain Partition 0",
        );

        assert!(
            spu_1.get(&1).is_none(),
            "SPU 1 should not contain Partition 1",
        );

        assert!(
            partition_2
                .iter()
                .any(|record| record.value.as_ref() == b"C"),
            "SPU 1/Partition 2 should contain record C",
        );

        assert!(
            partition_2
                .iter()
                .any(|record| record.value.as_ref() == b"F"),
            "SPU 1/Partition 2 should contain record F",
        );
    }

    #[test]
    fn test_assemble_requests() {
        let partitions_by_spu = {
            let mut pbs = HashMap::new();

            let spu_0 = {
                let mut s0_partitions = HashMap::new();
                let partition_0_records = vec![Record::new("A"), Record::new("B")];
                let partition_1_records = vec![Record::new("C"), Record::new("D")];
                s0_partitions.insert(0, partition_0_records);
                s0_partitions.insert(1, partition_1_records);
                s0_partitions
            };

            let spu_1 = {
                let mut s1_partitions = HashMap::new();
                let partition_2_records = vec![Record::new("E"), Record::new("F")];
                let partition_3_records = vec![Record::new("G"), Record::new("H")];
                s1_partitions.insert(2, partition_2_records);
                s1_partitions.insert(3, partition_3_records);
                s1_partitions
            };
            pbs.insert(0, spu_0);
            pbs.insert(1, spu_1);
            pbs
        };

        let requests = assemble_requests("TOPIC", partitions_by_spu);
        assert_eq!(requests.len(), 2);

        // SPU 0
        {
            let (spu0, request) = requests.iter().find(|(spu, _)| *spu == 0).unwrap();
            assert_eq!(*spu0, 0);
            assert_eq!(request.topics.len(), 1);
            let topic_request = request.topics.get(0).unwrap();
            assert_eq!(topic_request.name, "TOPIC");
            assert_eq!(topic_request.partitions.len(), 2);

            let partition_0_request = topic_request
                .partitions
                .iter()
                .find(|p| p.partition_index == 0)
                .unwrap();
            assert_eq!(partition_0_request.records.batches.len(), 1);
            let batch = partition_0_request.records.batches.get(0).unwrap();
            assert_eq!(batch.records().len(), 2);
            let record_0_0 = batch.records().get(0).unwrap();
            assert_eq!(record_0_0.value.as_ref(), b"A");
            let record_0_1 = batch.records().get(1).unwrap();
            assert_eq!(record_0_1.value.as_ref(), b"B");

            let partition_1_request = topic_request
                .partitions
                .iter()
                .find(|p| p.partition_index == 1)
                .unwrap();
            assert_eq!(partition_1_request.records.batches.len(), 1);
            let batch = partition_1_request.records.batches.get(0).unwrap();
            assert_eq!(batch.records().len(), 2);
            let record_1_0 = batch.records().get(0).unwrap();
            assert_eq!(record_1_0.value.as_ref(), b"C");
            let record_1_1 = batch.records().get(1).unwrap();
            assert_eq!(record_1_1.value.as_ref(), b"D");
        }

        // SPU 1
        {
            let (spu1, request) = requests.iter().find(|(spu, _)| *spu == 1).unwrap();
            assert_eq!(*spu1, 1);
            assert_eq!(request.topics.len(), 1);
            let topic_request = request.topics.get(0).unwrap();
            assert_eq!(topic_request.name, "TOPIC");
            assert_eq!(topic_request.partitions.len(), 2);

            let partition_0_request = topic_request
                .partitions
                .iter()
                .find(|p| p.partition_index == 2)
                .unwrap();
            assert_eq!(partition_0_request.records.batches.len(), 1);
            let batch = partition_0_request.records.batches.get(0).unwrap();
            assert_eq!(batch.records().len(), 2);
            let record_0_0 = batch.records().get(0).unwrap();
            assert_eq!(record_0_0.value.as_ref(), b"E");
            let record_0_1 = batch.records().get(1).unwrap();
            assert_eq!(record_0_1.value.as_ref(), b"F");

            let partition_1_request = topic_request
                .partitions
                .iter()
                .find(|p| p.partition_index == 3)
                .unwrap();
            assert_eq!(partition_1_request.records.batches.len(), 1);
            let batch = partition_1_request.records.batches.get(0).unwrap();
            assert_eq!(batch.records().len(), 2);
            let record_1_0 = batch.records().get(0).unwrap();
            assert_eq!(record_1_0.value.as_ref(), b"G");
            let record_1_1 = batch.records().get(1).unwrap();
            assert_eq!(record_1_1.value.as_ref(), b"H");
        }
    }
}