Skip to main content

diskann_benchmark_core/streaming/
api.rs

1/*
2 * Copyright (c) Microsoft Corporation.
3 * Licensed under the MIT license.
4 */
5
6use std::any::Any;
7
8/// A streaming interface for performing dynamic (streaming) operations on an index.
9///
10/// Streams are characterized by five operations:
11///
12/// * `search`: This is, after all, the whole reason for building an index.
13/// * `insert`: Insert new points into the index that do not already exist.
14/// * `replace`: Replace existing points in the index with new data.
15/// * `delete`: Remove points from the index.
16/// * `maintain`: Perform maintenance operations on the index. Examples may
17///   include fully removing deleted points from internal references.
18///
19/// This trait is parameterized by an [`Arguments`] proxy trait, which defines the
20/// argument types for each of the operations. The motivation here is to allow nesting
21/// of [`Stream`] implementations that progressively modify or adapt the arguments
22/// for better code reuse. An example of this is
23/// [`crate::streaming::executors::bigann::WithData`], which is a stream layer adapting
24/// the raw ranges used by [`crate::streaming::executors::bigann::RunBook`] into
25/// actual data slices.
26///
27/// Runners for [`Stream`]s use the [`Executor`] trait to invoke the stream operations
28/// in a structured way.
29pub trait Stream<A>
30where
31    A: Arguments,
32{
33    /// Output type for all operations. The `'static` is to allow results to be
34    /// aggregated in [`Any`] for type erasure in higher level [`Executor`]s.
35    type Output: 'static;
36
37    /// Perform a search operation.
38    fn search(&mut self, args: A::Search<'_>) -> anyhow::Result<Self::Output>;
39
40    /// Perform an insert operation.
41    fn insert(&mut self, args: A::Insert<'_>) -> anyhow::Result<Self::Output>;
42
43    /// Perform a replace operation.
44    fn replace(&mut self, args: A::Replace<'_>) -> anyhow::Result<Self::Output>;
45
46    /// Perform a delete operation.
47    fn delete(&mut self, args: A::Delete<'_>) -> anyhow::Result<Self::Output>;
48
49    /// Perform a maintain operation.
50    fn maintain(&mut self, args: A::Maintain<'_>) -> anyhow::Result<Self::Output>;
51
52    /// Indicate whether or not maintenance is needed. [`Executor`] implementations
53    /// are responsible periodically checking this.
54    fn needs_maintenance(&mut self) -> bool;
55}
56
57/// Operation arguments to [`Stream`].
58pub trait Arguments: 'static {
59    /// Argument to [`Stream::search`].
60    type Search<'a>;
61    /// Argument to [`Stream::insert`].
62    type Insert<'a>;
63    /// Argument to [`Stream::replace`].
64    type Replace<'a>;
65    /// Argument to [`Stream::delete`].
66    type Delete<'a>;
67    /// Argument to [`Stream::maintain`].
68    type Maintain<'a>;
69}
70
71/// A sequential executor for [`Stream`]s.
72///
73/// Implementations invoke the operations of a [`Stream`] in a structured way (which should
74/// be reflected in the associated documentation) and aggregate the results.
75pub trait Executor {
76    /// The argument collection type for the underlying [`Stream`].
77    type Args: Arguments;
78
79    /// Execute a series of operations on `stream`. As outputs are produced, they will be
80    /// passed to `collect` for aggregation.
81    ///
82    /// Since dynamic execution may be long-running, this allows implementations of `collect`
83    /// to perform operations like status updates or partial saving of results as they are
84    /// generated.
85    ///
86    /// See also: [`Executor::run`].
87    fn run_with<S, F, O>(&mut self, stream: &mut S, collect: F) -> anyhow::Result<()>
88    where
89        S: Stream<Self::Args, Output = O>,
90        O: 'static,
91        F: FnMut(O) -> anyhow::Result<()>;
92
93    /// Execute a series of operations on `stream`. The outputs of each operation will be
94    /// collected in the retuned `Vec` in-order.
95    ///
96    /// See also: [`Executor::run_with`].
97    fn run<S>(&mut self, stream: &mut S) -> anyhow::Result<Vec<S::Output>>
98    where
99        S: Stream<Self::Args>,
100    {
101        let mut outputs = Vec::new();
102        self.run_with(stream, |output| {
103            outputs.push(output);
104            Ok(())
105        })?;
106        Ok(outputs)
107    }
108}
109
110/// A type-erased [`Stream`] implementation that wraps stream (outputs)[`Stream::Output`] in
111/// [`Box<dyn Any>`].
112///
113/// This is useful as the final layer in a stack of nested [`Stream`]s that allows the top
114/// level [`Executor`] to operate without knowledge of the concrete output types.
115///
116/// From a performance perspective, this is usually fine since
117///
118/// 1. Individual [`Stream`] operations are typically expensive relative to the cost of boxing.
119/// 2. Many concrete [`Stream`] implementations will use the same [`Executor`] implementation.
120///    Thus, boxing can help reduce code bloat without significant performance impact.
121#[derive(Debug)]
122pub struct AnyStream<'a, T>(&'a mut T);
123
124impl<'a, T> AnyStream<'a, T> {
125    /// Wrap `stream` in an [`AnyStream`].
126    pub fn new(stream: &'a mut T) -> Self {
127        Self(stream)
128    }
129}
130
131fn boxed<T>(x: T) -> Box<dyn Any>
132where
133    T: Any,
134{
135    Box::new(x)
136}
137
138impl<A, T> Stream<A> for AnyStream<'_, T>
139where
140    A: Arguments,
141    T: Stream<A>,
142{
143    type Output = Box<dyn Any>;
144
145    fn search(&mut self, args: A::Search<'_>) -> anyhow::Result<Self::Output> {
146        self.0.search(args).map(boxed)
147    }
148
149    fn insert(&mut self, args: A::Insert<'_>) -> anyhow::Result<Self::Output> {
150        self.0.insert(args).map(boxed)
151    }
152
153    fn replace(&mut self, args: A::Replace<'_>) -> anyhow::Result<Self::Output> {
154        self.0.replace(args).map(boxed)
155    }
156
157    fn delete(&mut self, args: A::Delete<'_>) -> anyhow::Result<Self::Output> {
158        self.0.delete(args).map(boxed)
159    }
160
161    fn maintain(&mut self, args: A::Maintain<'_>) -> anyhow::Result<Self::Output> {
162        self.0.maintain(args).map(boxed)
163    }
164
165    fn needs_maintenance(&mut self) -> bool {
166        self.0.needs_maintenance()
167    }
168}
169
170///////////
171// Tests //
172///////////
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    // The main thing we're testing here is the implementation of [`Executor::run`] and
179    // to a lesser extent `AnyStream`.
180
181    struct Search;
182    struct Insert;
183    struct Replace;
184    struct Delete;
185    struct Maintain;
186
187    #[derive(Debug, PartialEq)]
188    enum Op {
189        Search,
190        Insert,
191        Replace,
192        Delete,
193        Maintain,
194    }
195
196    struct TestArgs;
197
198    impl Arguments for TestArgs {
199        type Search<'a> = Search;
200        type Insert<'a> = Insert;
201        type Replace<'a> = Replace;
202        type Delete<'a> = Delete;
203        type Maintain<'a> = Maintain;
204    }
205
206    struct TestStream {
207        needs_maintenance: bool,
208    }
209
210    impl TestStream {
211        fn new(needs_maintenance: bool) -> Self {
212            Self { needs_maintenance }
213        }
214    }
215
216    impl Stream<TestArgs> for TestStream {
217        type Output = Op;
218
219        fn search(&mut self, _args: Search) -> anyhow::Result<Self::Output> {
220            Ok(Op::Search)
221        }
222
223        fn insert(&mut self, _args: Insert) -> anyhow::Result<Self::Output> {
224            Ok(Op::Insert)
225        }
226
227        fn replace(&mut self, _args: Replace) -> anyhow::Result<Self::Output> {
228            Ok(Op::Replace)
229        }
230
231        fn delete(&mut self, _args: Delete) -> anyhow::Result<Self::Output> {
232            Ok(Op::Delete)
233        }
234
235        fn maintain(&mut self, _args: Maintain) -> anyhow::Result<Self::Output> {
236            Ok(Op::Maintain)
237        }
238
239        fn needs_maintenance(&mut self) -> bool {
240            self.needs_maintenance
241        }
242    }
243
244    struct TestExecutor;
245
246    impl Executor for TestExecutor {
247        type Args = TestArgs;
248
249        fn run_with<S, F, O>(&mut self, stream: &mut S, mut collect: F) -> anyhow::Result<()>
250        where
251            S: Stream<Self::Args, Output = O>,
252            O: 'static,
253            F: FnMut(O) -> anyhow::Result<()>,
254        {
255            collect(stream.search(Search)?)?;
256            collect(stream.insert(Insert)?)?;
257            collect(stream.replace(Replace)?)?;
258            collect(stream.delete(Delete)?)?;
259            collect(stream.maintain(Maintain)?)?;
260            Ok(())
261        }
262    }
263
264    #[test]
265    fn test_executor_run() -> anyhow::Result<()> {
266        let mut stream = TestStream::new(false);
267        let mut executor = TestExecutor;
268
269        let outputs = executor.run(&mut stream)?;
270
271        assert_eq!(outputs.len(), 5);
272        assert!(matches!(outputs[0], Op::Search));
273        assert!(matches!(outputs[1], Op::Insert));
274        assert!(matches!(outputs[2], Op::Replace));
275        assert!(matches!(outputs[3], Op::Delete));
276        assert!(matches!(outputs[4], Op::Maintain));
277
278        Ok(())
279    }
280
281    #[test]
282    fn test_any_stream() {
283        let mut stream = TestStream::new(false);
284        let mut any_stream = AnyStream::new(&mut stream);
285
286        assert!(
287            !any_stream.needs_maintenance(),
288            "AnyStream should forward `needs_maintenance`"
289        );
290        assert_eq!(
291            any_stream
292                .search(Search)
293                .unwrap()
294                .downcast_ref::<Op>()
295                .unwrap(),
296            &Op::Search
297        );
298        assert_eq!(
299            any_stream
300                .insert(Insert)
301                .unwrap()
302                .downcast_ref::<Op>()
303                .unwrap(),
304            &Op::Insert
305        );
306        assert_eq!(
307            any_stream
308                .replace(Replace)
309                .unwrap()
310                .downcast_ref::<Op>()
311                .unwrap(),
312            &Op::Replace
313        );
314        assert_eq!(
315            any_stream
316                .delete(Delete)
317                .unwrap()
318                .downcast_ref::<Op>()
319                .unwrap(),
320            &Op::Delete
321        );
322        assert_eq!(
323            any_stream
324                .maintain(Maintain)
325                .unwrap()
326                .downcast_ref::<Op>()
327                .unwrap(),
328            &Op::Maintain
329        );
330
331        let mut stream = TestStream::new(true);
332        let mut any_stream = AnyStream::new(&mut stream);
333        assert!(
334            any_stream.needs_maintenance(),
335            "AnyStream should forward `needs_maintenance`"
336        );
337    }
338}