matchmaker/nucleo/
injector.rs1use std::{
5 marker::PhantomData,
6 sync::{
7 Arc,
8 atomic::{AtomicU32, Ordering},
9 },
10};
11
12use super::worker::{Column, Worker, WorkerError};
13use super::{Indexed, Segmented};
14use crate::{SSS, SegmentableItem, SplitterFn};
15
16pub trait Injector {
17 type InputItem;
18 type Inner: Injector;
19 type Context;
20
21 fn new(injector: Self::Inner, data: Self::Context) -> Self;
22 fn inner(&self) -> &Self::Inner;
23 fn wrap(
24 &self,
25 item: Self::InputItem,
26 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError>;
27
28 fn push(&self, item: Self::InputItem) -> Result<(), WorkerError> {
29 let item = self.wrap(item)?;
30 self.inner().push(item)
31 }
32}
33
34impl Injector for () {
35 fn inner(&self) -> &Self::Inner {
36 unreachable!()
37 }
38 fn new(_: Self::Inner, _: Self::Context) -> Self {
39 unreachable!()
40 }
41 fn wrap(
42 &self,
43 _: Self::InputItem,
44 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
45 unreachable!()
46 }
47
48 type Context = ();
49 type Inner = ();
50 type InputItem = ();
51}
52
53pub struct WorkerInjector<T> {
54 pub(super) inner: nucleo::Injector<T>,
55 pub(super) columns: Arc<[Column<T>]>,
56 pub(super) version: u32,
57 pub(super) picker_version: Arc<AtomicU32>,
58}
59
60impl<T: SSS> Injector for WorkerInjector<T> {
61 type InputItem = T;
62 type Inner = ();
63 type Context = Worker<T>;
64
65 fn new(_: Self::Inner, data: Self::Context) -> Self {
66 data.injector()
67 }
68
69 fn inner(&self) -> &Self::Inner {
70 &()
71 }
72
73 fn wrap(
74 &self,
75 _: Self::InputItem,
76 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
77 Ok(())
78 }
79
80 fn push(&self, item: T) -> Result<(), WorkerError> {
81 if self.version != self.picker_version.load(Ordering::Relaxed) {
82 return Err(WorkerError::InjectorShutdown);
83 }
84 push_impl(&self.inner, &self.columns, item);
85 Ok(())
86 }
87}
88
89pub(super) fn push_impl<T>(injector: &nucleo::Injector<T>, columns: &[Column<T>], item: T) {
90 injector.push(item, |item, dst| {
91 for (column, text) in columns.iter().filter(|column| column.filter).zip(dst) {
92 *text = column.format_text(item).into()
93 }
94 });
95}
96
97#[derive(Clone)]
101pub struct IndexedInjector<T, I: Injector<InputItem = Indexed<T>>> {
102 injector: I,
103 counter: &'static AtomicU32,
104 input_type: PhantomData<T>,
105}
106
107impl<T, I: Injector<InputItem = Indexed<T>>> Injector for IndexedInjector<T, I> {
109 type InputItem = T;
110 type Inner = I;
111 type Context = &'static AtomicU32;
112
113 fn new(injector: Self::Inner, counter: Self::Context) -> Self {
114 Self {
115 injector,
116 counter,
117 input_type: PhantomData,
118 }
119 }
120
121 fn wrap(
122 &self,
123 item: Self::InputItem,
124 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
125 let index = self.counter.fetch_add(1, Ordering::SeqCst);
126 Ok(Indexed { index, inner: item })
127 }
128
129 fn inner(&self) -> &Self::Inner {
130 &self.injector
131 }
132}
133
134static GLOBAL_COUNTER: AtomicU32 = AtomicU32::new(0);
135
136impl<T, I> IndexedInjector<T, I>
137where
138 I: Injector<InputItem = Indexed<T>>,
139{
140 pub fn new_globally_indexed(injector: <Self as Injector>::Inner) -> Self {
141 Self::global_reset();
142 Self::new(injector, &GLOBAL_COUNTER)
143 }
144
145 pub fn global_reset() {
146 GLOBAL_COUNTER.store(0, Ordering::SeqCst);
147 }
148}
149
150pub struct SegmentedInjector<T: SegmentableItem, I: Injector<InputItem = Segmented<T>>> {
151 injector: I,
152 splitter: SplitterFn<T>,
153 input_type: PhantomData<T>,
154}
155
156impl<T: SegmentableItem, I: Injector<InputItem = Segmented<T>>> Injector
157 for SegmentedInjector<T, I>
158{
159 type InputItem = T;
160 type Inner = I;
161 type Context = SplitterFn<T>;
162
163 fn new(injector: Self::Inner, data: Self::Context) -> Self {
164 Self {
165 injector,
166 splitter: data,
167 input_type: PhantomData,
168 }
169 }
170
171 fn wrap(
172 &self,
173 item: Self::InputItem,
174 ) -> Result<<Self::Inner as Injector>::InputItem, WorkerError> {
175 let ranges = (self.splitter)(&item);
176 Ok(Segmented {
177 inner: item,
178 ranges,
179 })
180 }
181
182 fn inner(&self) -> &Self::Inner {
183 &self.injector
184 }
185
186 fn push(&self, item: Self::InputItem) -> Result<(), WorkerError> {
187 let item = self.wrap(item)?;
188 self.inner().push(item)
189 }
190}
191
192impl<T> Clone for WorkerInjector<T> {
230 fn clone(&self) -> Self {
231 Self {
232 inner: self.inner.clone(),
233 columns: Arc::clone(&self.columns),
234 version: self.version,
235 picker_version: Arc::clone(&self.picker_version),
236 }
237 }
238}
239
240impl<T: SegmentableItem, I: Injector<InputItem = Segmented<T>> + Clone> Clone
241 for SegmentedInjector<T, I>
242{
243 fn clone(&self) -> Self {
244 Self {
245 injector: self.injector.clone(),
246 splitter: Arc::clone(&self.splitter),
247 input_type: PhantomData,
248 }
249 }
250}