timely_communication/allocator/
generic.rs

1//! A generic allocator, wrapping known implementors of `Allocate`.
2//!
3//! This type is useful in settings where it is difficult to write code generic in `A: Allocate`,
4//! for example closures whose type arguments must be specified.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8use std::collections::VecDeque;
9
10use crate::allocator::thread::ThreadBuilder;
11use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
12use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
13use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
14use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};
15
16use crate::{Push, Pull, Data, Message};
17
18/// Enumerates known implementors of `Allocate`.
19/// Passes trait method calls on to members.
20pub enum Generic {
21    /// Intra-thread allocator.
22    Thread(Thread),
23    /// Inter-thread, intra-process allocator.
24    Process(Process),
25    /// Inter-thread, intra-process serializing allocator.
26    ProcessBinary(ProcessAllocator),
27    /// Inter-process allocator.
28    ZeroCopy(TcpAllocator<Process>),
29}
30
31impl Generic {
32    /// The index of the worker out of `(0..self.peers())`.
33    pub fn index(&self) -> usize {
34        match self {
35            Generic::Thread(t) => t.index(),
36            Generic::Process(p) => p.index(),
37            Generic::ProcessBinary(pb) => pb.index(),
38            Generic::ZeroCopy(z) => z.index(),
39        }
40    }
41    /// The number of workers.
42    pub fn peers(&self) -> usize {
43        match self {
44            Generic::Thread(t) => t.peers(),
45            Generic::Process(p) => p.peers(),
46            Generic::ProcessBinary(pb) => pb.peers(),
47            Generic::ZeroCopy(z) => z.peers(),
48        }
49    }
50    /// Constructs several send endpoints and one receive endpoint.
51    fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
52        match self {
53            Generic::Thread(t) => t.allocate(identifier),
54            Generic::Process(p) => p.allocate(identifier),
55            Generic::ProcessBinary(pb) => pb.allocate(identifier),
56            Generic::ZeroCopy(z) => z.allocate(identifier),
57        }
58    }
59    /// Perform work before scheduling operators.
60    fn receive(&mut self) {
61        match self {
62            Generic::Thread(t) => t.receive(),
63            Generic::Process(p) => p.receive(),
64            Generic::ProcessBinary(pb) => pb.receive(),
65            Generic::ZeroCopy(z) => z.receive(),
66        }
67    }
68    /// Perform work after scheduling operators.
69    pub fn release(&mut self) {
70        match self {
71            Generic::Thread(t) => t.release(),
72            Generic::Process(p) => p.release(),
73            Generic::ProcessBinary(pb) => pb.release(),
74            Generic::ZeroCopy(z) => z.release(),
75        }
76    }
77    fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
78        match self {
79            Generic::Thread(ref t) => t.events(),
80            Generic::Process(ref p) => p.events(),
81            Generic::ProcessBinary(ref pb) => pb.events(),
82            Generic::ZeroCopy(ref z) => z.events(),
83        }
84    }
85}
86
87impl Allocate for Generic {
88    fn index(&self) -> usize { self.index() }
89    fn peers(&self) -> usize { self.peers() }
90    fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
91        self.allocate(identifier)
92    }
93
94    fn receive(&mut self) { self.receive(); }
95    fn release(&mut self) { self.release(); }
96    fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
97    fn await_events(&self, _duration: Option<std::time::Duration>) {
98        match self {
99            Generic::Thread(t) => t.await_events(_duration),
100            Generic::Process(p) => p.await_events(_duration),
101            Generic::ProcessBinary(pb) => pb.await_events(_duration),
102            Generic::ZeroCopy(z) => z.await_events(_duration),
103        }
104    }
105}
106
107
108/// Enumerations of constructable implementors of `Allocate`.
109///
110/// The builder variants are meant to be `Send`, so that they can be moved across threads,
111/// whereas the allocator they construct may not. As an example, the `ProcessBinary` type
112/// contains `Rc` wrapped state, and so cannot itself be moved across threads.
113pub enum GenericBuilder {
114    /// Builder for `Thread` allocator.
115    Thread(ThreadBuilder),
116    /// Builder for `Process` allocator.
117    Process(TypedProcessBuilder),
118    /// Builder for `ProcessBinary` allocator.
119    ProcessBinary(ProcessBuilder),
120    /// Builder for `ZeroCopy` allocator.
121    ZeroCopy(TcpBuilder<TypedProcessBuilder>),
122}
123
124impl AllocateBuilder for GenericBuilder {
125    type Allocator = Generic;
126    fn build(self) -> Generic {
127        match self {
128            GenericBuilder::Thread(t) => Generic::Thread(t.build()),
129            GenericBuilder::Process(p) => Generic::Process(p.build()),
130            GenericBuilder::ProcessBinary(pb) => Generic::ProcessBinary(pb.build()),
131            GenericBuilder::ZeroCopy(z) => Generic::ZeroCopy(z.build()),
132        }
133    }
134}