matchmaker/nucleo/
injector.rs1use std::{
5 marker::PhantomData,
6 sync::{
7 Arc,
8 atomic::{AtomicU32, Ordering},
9 },
10};
11
12use super::worker::{Column, Worker, WorkerError};
13use super::{Indexed, Segmented};
14use crate::{SSS, SegmentableItem, SplitterFn};
15
16pub trait Injector {
17 type InputItem;
18 type Inner: Injector;
19 type Context;
20
21 fn new(injector: Self::Inner, data: Self::Context) -> Self;
22 fn inner(&self) -> &Self::Inner;
23 fn wrap(
24 &self,
25 item: Self::InputItem,
26 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError>;
27
28 fn push(&self, item: Self::InputItem) -> Result<(), WorkerError> {
29 let item = self.wrap(item)?;
30 self.inner().push(item)
31 }
32}
33
34impl Injector for () {
35 fn inner(&self) -> &Self::Inner {
36 unreachable!()
37 }
38 fn new(_: Self::Inner, _: Self::Context) -> Self {
39 unreachable!()
40 }
41 fn wrap(
42 &self,
43 _: Self::InputItem,
44 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
45 unreachable!()
46 }
47
48 type Context = ();
49 type Inner = ();
50 type InputItem = ();
51}
52
53pub struct WorkerInjector<T> {
54 pub(super) inner: nucleo::Injector<T>,
55 pub(super) columns: Arc<[Column<T>]>,
56 pub(super) version: u32,
57 pub(super) picker_version: Arc<AtomicU32>,
58}
59
60impl<T: SSS> Injector for WorkerInjector<T> {
61 type InputItem = T;
62 type Inner = ();
63 type Context = Worker<T>;
64
65 fn new(_: Self::Inner, data: Self::Context) -> Self {
66 data.injector()
67 }
68
69 fn inner(&self) -> &Self::Inner {
70 &()
71 }
72
73 fn wrap(
74 &self,
75 _: Self::InputItem,
76 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
77 Ok(())
78 }
79
80 fn push(&self, item: T) -> Result<(), WorkerError> {
81 if self.version != self.picker_version.load(Ordering::Relaxed) {
82 return Err(WorkerError::InjectorShutdown);
83 }
84 push_impl(&self.inner, &self.columns, item);
85 Ok(())
86 }
87}
88
89pub(super) fn push_impl<T>(injector: &nucleo::Injector<T>, columns: &[Column<T>], item: T) {
90 injector.push(item, |item, dst| {
91 for (column, text) in columns.iter().filter(|column| column.filter).zip(dst) {
92 *text = column.format_text(item).into()
93 }
94 });
95}
96
97#[derive(Clone)]
101pub struct IndexedInjector<T, I: Injector<InputItem = Indexed<T>>> {
102 injector: I,
103 counter: &'static AtomicU32,
104 input_type: PhantomData<T>,
105}
106
107impl<T, I: Injector<InputItem = Indexed<T>>> Injector for IndexedInjector<T, I> {
108 type InputItem = T;
109 type Inner = I;
110 type Context = &'static AtomicU32;
111
112 fn new(injector: Self::Inner, counter: Self::Context) -> Self {
113 Self {
114 injector,
115 counter,
116 input_type: PhantomData,
117 }
118 }
119
120 fn wrap(
121 &self,
122 item: Self::InputItem,
123 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
124 let index = self.counter.fetch_add(1, Ordering::SeqCst);
125 Ok(Indexed { index, inner: item })
126 }
127
128 fn inner(&self) -> &Self::Inner {
129 &self.injector
130 }
131}
132
133static GLOBAL_COUNTER: AtomicU32 = AtomicU32::new(0);
134
135impl<T, I> IndexedInjector<T, I>
136where
137 I: Injector<InputItem = Indexed<T>>,
138{
139 pub fn new_globally_indexed(injector: <Self as Injector>::Inner) -> Self {
140 Self::global_reset();
141 Self::new(injector, &GLOBAL_COUNTER)
142 }
143
144 pub fn global_reset() {
145 GLOBAL_COUNTER.store(0, Ordering::SeqCst);
146 }
147}
148
149pub struct SegmentedInjector<T: SegmentableItem, I: Injector<InputItem = Segmented<T>>> {
150 injector: I,
151 splitter: SplitterFn<T>,
152 input_type: PhantomData<T>,
153}
154
155impl<T: SegmentableItem, I: Injector<InputItem = Segmented<T>>> Injector
156 for SegmentedInjector<T, I>
157{
158 type InputItem = T;
159 type Inner = I;
160 type Context = SplitterFn<T>;
161
162 fn new(injector: Self::Inner, data: Self::Context) -> Self {
163 Self {
164 injector,
165 splitter: data,
166 input_type: PhantomData,
167 }
168 }
169
170 fn wrap(
171 &self,
172 item: Self::InputItem,
173 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
174 let ranges = (self.splitter)(&item);
175 Ok(Segmented {
176 inner: item,
177 ranges,
178 })
179 }
180
181 fn inner(&self) -> &Self::Inner {
182 &self.injector
183 }
184
185 fn push(&self, item: Self::InputItem) -> Result<(), WorkerError> {
186 let item = self.wrap(item)?;
187 self.inner().push(item)
188 }
189}
190
191impl<T> Clone for WorkerInjector<T> {
229 fn clone(&self) -> Self {
230 Self {
231 inner: self.inner.clone(),
232 columns: Arc::clone(&self.columns),
233 version: self.version,
234 picker_version: Arc::clone(&self.picker_version),
235 }
236 }
237}
238
239impl<T: SegmentableItem, I: Injector<InputItem = Segmented<T>> + Clone> Clone
240 for SegmentedInjector<T, I>
241{
242 fn clone(&self) -> Self {
243 Self {
244 injector: self.injector.clone(),
245 splitter: Arc::clone(&self.splitter),
246 input_type: PhantomData,
247 }
248 }
249}