1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
use super::{ObjectError, Result, Writer};
use crate::{crypto::Digest, ChunkPointer};

use flume as mpsc;

#[derive(Clone)]
pub struct RoundRobinBalancer<W> {
    enqueue: mpsc::Sender<W>,
    dequeue: mpsc::Receiver<W>,
    writers: usize,
}

impl<W: 'static + Writer + Clone> RoundRobinBalancer<W> {
    pub fn new(writer: W, writers: usize) -> Result<Self> {
        let (enqueue, dequeue) = mpsc::bounded(writers);

        for _ in 0..writers {
            enqueue
                .send(writer.clone())
                .map_err(|_| ObjectError::Fatal)?;
        }

        Ok(RoundRobinBalancer {
            enqueue,
            dequeue,
            writers,
        })
    }
}

impl<W: 'static + Writer> Writer for RoundRobinBalancer<W> {
    fn write_chunk(&mut self, hash: &Digest, data: &[u8]) -> Result<ChunkPointer> {
        let mut writer = self.dequeue.recv().map_err(|_| ObjectError::Fatal)?;
        let result = writer.write_chunk(hash, data);
        self.enqueue.send(writer).map_err(|_| ObjectError::Fatal)?;

        result
    }

    fn flush(&mut self) -> Result<()> {
        for _ in 0..self.writers {
            let mut writer = self.dequeue.recv().map_err(|_| ObjectError::Fatal)?;
            writer.flush()?;
            self.enqueue.send(writer).map_err(|_| ObjectError::Fatal)?;
        }

        Ok(())
    }
}