1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use std::sync::{
    Arc, Mutex,
    mpsc,
    mpsc::Sender
};

use crate::{
    Bottle,
    Heater,
    Ice, IceBox,
};



/// Ice Melter.
///
/// This struct is responsible for queueing all the Ice to be molten using it's Heaters.
pub struct Melter {
    heaters: Vec<Heater>,
    ice_boxes: Sender<IceBox>
}

impl Melter {
    /// Creates a new Ice Melter with specified amount of Heaters.
    ///
    /// # Panics
    ///
    /// When count is zero.
    pub fn new(count: usize) -> Melter {
        assert!(count > 0, "ice_threads::Melter::new -> Heater count must be greater than 0!");

        let mut heaters = Vec::with_capacity(count);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));
        for _ in 0..count {
            heaters.push(Heater::new(receiver.clone()));
        }

        Melter {
            heaters,
            ice_boxes: sender
        }
    }

    /// Enqueues Ice to be molten.
    ///
    /// Returns a Bottle which will be filled with Water coming from given Ice.
    ///
    /// # Panics
    ///
    /// When all Heaters fail.
    ///
    /// This should generally be impossible unless the user's code happens to have a bug, panics
    /// and doesn't open the poisoned [`Bottle`] that would stop the process earlier.
    ///
    /// [`Bottle`]: struct.Bottle.html
    #[must_use]
    pub fn melt<I, W>(&self, ice: I) -> Bottle<W>
    where
        I: FnOnce() -> W + Send + 'static,
        W: Send + 'static
    {
        let (sender, receiver) = mpsc::channel::<W>();
        let ice_box = IceBox::Some(
            Ice::new(move || {
                sender.send(ice()).ok();
            })
        );

        self.ice_boxes
            .send(ice_box)
            .expect("ice_threads::Melter::melt -> Failed to send Ice box to a Heater group");

        Bottle::new(receiver)
    }
}

impl Drop for Melter {
    /// Gracefully powers down all Heaters.
    fn drop(&mut self) {
        for _ in &self.heaters {
            if self.ice_boxes.send(IceBox::None).is_err() {
                break;
            }
        }

        for heater in &mut self.heaters {
            if let Some(core) = heater.take_core() {
                core.join().ok();
            }
        }
    }
}