diskann_benchmark_core/streaming/
api.rs1use std::any::Any;
7
8pub trait Stream<A>
30where
31 A: Arguments,
32{
33 type Output: 'static;
36
37 fn search(&mut self, args: A::Search<'_>) -> anyhow::Result<Self::Output>;
39
40 fn insert(&mut self, args: A::Insert<'_>) -> anyhow::Result<Self::Output>;
42
43 fn replace(&mut self, args: A::Replace<'_>) -> anyhow::Result<Self::Output>;
45
46 fn delete(&mut self, args: A::Delete<'_>) -> anyhow::Result<Self::Output>;
48
49 fn maintain(&mut self, args: A::Maintain<'_>) -> anyhow::Result<Self::Output>;
51
52 fn needs_maintenance(&mut self) -> bool;
55}
56
57pub trait Arguments: 'static {
59 type Search<'a>;
61 type Insert<'a>;
63 type Replace<'a>;
65 type Delete<'a>;
67 type Maintain<'a>;
69}
70
71pub trait Executor {
76 type Args: Arguments;
78
79 fn run_with<S, F, O>(&mut self, stream: &mut S, collect: F) -> anyhow::Result<()>
88 where
89 S: Stream<Self::Args, Output = O>,
90 O: 'static,
91 F: FnMut(O) -> anyhow::Result<()>;
92
93 fn run<S>(&mut self, stream: &mut S) -> anyhow::Result<Vec<S::Output>>
98 where
99 S: Stream<Self::Args>,
100 {
101 let mut outputs = Vec::new();
102 self.run_with(stream, |output| {
103 outputs.push(output);
104 Ok(())
105 })?;
106 Ok(outputs)
107 }
108}
109
110#[derive(Debug)]
122pub struct AnyStream<'a, T>(&'a mut T);
123
124impl<'a, T> AnyStream<'a, T> {
125 pub fn new(stream: &'a mut T) -> Self {
127 Self(stream)
128 }
129}
130
131fn boxed<T>(x: T) -> Box<dyn Any>
132where
133 T: Any,
134{
135 Box::new(x)
136}
137
138impl<A, T> Stream<A> for AnyStream<'_, T>
139where
140 A: Arguments,
141 T: Stream<A>,
142{
143 type Output = Box<dyn Any>;
144
145 fn search(&mut self, args: A::Search<'_>) -> anyhow::Result<Self::Output> {
146 self.0.search(args).map(boxed)
147 }
148
149 fn insert(&mut self, args: A::Insert<'_>) -> anyhow::Result<Self::Output> {
150 self.0.insert(args).map(boxed)
151 }
152
153 fn replace(&mut self, args: A::Replace<'_>) -> anyhow::Result<Self::Output> {
154 self.0.replace(args).map(boxed)
155 }
156
157 fn delete(&mut self, args: A::Delete<'_>) -> anyhow::Result<Self::Output> {
158 self.0.delete(args).map(boxed)
159 }
160
161 fn maintain(&mut self, args: A::Maintain<'_>) -> anyhow::Result<Self::Output> {
162 self.0.maintain(args).map(boxed)
163 }
164
165 fn needs_maintenance(&mut self) -> bool {
166 self.0.needs_maintenance()
167 }
168}
169
170#[cfg(test)]
175mod tests {
176 use super::*;
177
178 struct Search;
182 struct Insert;
183 struct Replace;
184 struct Delete;
185 struct Maintain;
186
187 #[derive(Debug, PartialEq)]
188 enum Op {
189 Search,
190 Insert,
191 Replace,
192 Delete,
193 Maintain,
194 }
195
196 struct TestArgs;
197
198 impl Arguments for TestArgs {
199 type Search<'a> = Search;
200 type Insert<'a> = Insert;
201 type Replace<'a> = Replace;
202 type Delete<'a> = Delete;
203 type Maintain<'a> = Maintain;
204 }
205
206 struct TestStream {
207 needs_maintenance: bool,
208 }
209
210 impl TestStream {
211 fn new(needs_maintenance: bool) -> Self {
212 Self { needs_maintenance }
213 }
214 }
215
216 impl Stream<TestArgs> for TestStream {
217 type Output = Op;
218
219 fn search(&mut self, _args: Search) -> anyhow::Result<Self::Output> {
220 Ok(Op::Search)
221 }
222
223 fn insert(&mut self, _args: Insert) -> anyhow::Result<Self::Output> {
224 Ok(Op::Insert)
225 }
226
227 fn replace(&mut self, _args: Replace) -> anyhow::Result<Self::Output> {
228 Ok(Op::Replace)
229 }
230
231 fn delete(&mut self, _args: Delete) -> anyhow::Result<Self::Output> {
232 Ok(Op::Delete)
233 }
234
235 fn maintain(&mut self, _args: Maintain) -> anyhow::Result<Self::Output> {
236 Ok(Op::Maintain)
237 }
238
239 fn needs_maintenance(&mut self) -> bool {
240 self.needs_maintenance
241 }
242 }
243
244 struct TestExecutor;
245
246 impl Executor for TestExecutor {
247 type Args = TestArgs;
248
249 fn run_with<S, F, O>(&mut self, stream: &mut S, mut collect: F) -> anyhow::Result<()>
250 where
251 S: Stream<Self::Args, Output = O>,
252 O: 'static,
253 F: FnMut(O) -> anyhow::Result<()>,
254 {
255 collect(stream.search(Search)?)?;
256 collect(stream.insert(Insert)?)?;
257 collect(stream.replace(Replace)?)?;
258 collect(stream.delete(Delete)?)?;
259 collect(stream.maintain(Maintain)?)?;
260 Ok(())
261 }
262 }
263
264 #[test]
265 fn test_executor_run() -> anyhow::Result<()> {
266 let mut stream = TestStream::new(false);
267 let mut executor = TestExecutor;
268
269 let outputs = executor.run(&mut stream)?;
270
271 assert_eq!(outputs.len(), 5);
272 assert!(matches!(outputs[0], Op::Search));
273 assert!(matches!(outputs[1], Op::Insert));
274 assert!(matches!(outputs[2], Op::Replace));
275 assert!(matches!(outputs[3], Op::Delete));
276 assert!(matches!(outputs[4], Op::Maintain));
277
278 Ok(())
279 }
280
281 #[test]
282 fn test_any_stream() {
283 let mut stream = TestStream::new(false);
284 let mut any_stream = AnyStream::new(&mut stream);
285
286 assert!(
287 !any_stream.needs_maintenance(),
288 "AnyStream should forward `needs_maintenance`"
289 );
290 assert_eq!(
291 any_stream
292 .search(Search)
293 .unwrap()
294 .downcast_ref::<Op>()
295 .unwrap(),
296 &Op::Search
297 );
298 assert_eq!(
299 any_stream
300 .insert(Insert)
301 .unwrap()
302 .downcast_ref::<Op>()
303 .unwrap(),
304 &Op::Insert
305 );
306 assert_eq!(
307 any_stream
308 .replace(Replace)
309 .unwrap()
310 .downcast_ref::<Op>()
311 .unwrap(),
312 &Op::Replace
313 );
314 assert_eq!(
315 any_stream
316 .delete(Delete)
317 .unwrap()
318 .downcast_ref::<Op>()
319 .unwrap(),
320 &Op::Delete
321 );
322 assert_eq!(
323 any_stream
324 .maintain(Maintain)
325 .unwrap()
326 .downcast_ref::<Op>()
327 .unwrap(),
328 &Op::Maintain
329 );
330
331 let mut stream = TestStream::new(true);
332 let mut any_stream = AnyStream::new(&mut stream);
333 assert!(
334 any_stream.needs_maintenance(),
335 "AnyStream should forward `needs_maintenance`"
336 );
337 }
338}