1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
//! Generic Push-Pull Solver.
//!
//! This crate implements a generic solver for anything that can have a clear dependency graph.
//! The implementation is a mix of push (eager) and pull (lazy) architectures with user-driven
//! recursion.
//!
//! Functionality is centered on the [`Solver`] struct. Users records all *fragments* they want to
//! evaluate and only those.  What *is* a fragment is arbitrary and the solver does not care. It
//! may represent a variable, an action, an object, or anything else.
//!
//! Users must also implement the [`Problem`] trait, which defines a dependency graph and an
//! interface for evaluating fragments that the solver finds are both solvable and required. This
//! dependency graph does not need to be complete or explicit, as long as implementors can return
//! the direct dependencies of fragments as the solver explores this graph.
//!
//! [`Solver::run`] and [`Solver::step`] will incrementally explore the depedency graph, calling
//! [`Problem::evaluate`] on fragments that have all of its dependencies met.
//!
//! In the end, all requested fragments will either have been evaluated or some of those will have
//! been permanently punted (see next paragraph) due to being a part of a dependency cycle. The
//! user may choose to report cycles as errors, or break them with [`Solver::assume_evaluated`] or
//! [`Solver::clone_with_evaluation_assumptions`]. See also [`Solver::status`].
//!
//! [`Solver::punted_iter`] will return an iterator yielding all fragments that have been *punted*
//! so far. A punted fragment is one that has been considered for evaluation but its dependencies
//! haven't been met yet. If the solver is done, punted fragments must be are part of at least one
//! cycle.
//!
//! # Concurrency
//!
//! [`Solver`] is fully asynchronous but the core algorithm is not parallel at the moment. Running
//! multiple [`Solver::step`] concurrently or calling [`Solver::run`] with `concurrency > 1` will
//! not make the solver itself run faster. What this does allow is for multiple
//! [`Problem::direct_dependencies`] and [`Problem::evaluate`] calls to run concurrently.
//!
//! # Internals
//!
//! [`Solver`] implements a hybrid push-pull architecture. Fragments are only evaluated if needed
//! (pull, lazy evaluation), but instead of evaluating dependencies recursively, this process will
//! only evaluate fragments that already have all of its *direct* dependencies met. If that's not
//! the case, the fragment will be *punted*: stored away and only considered again if *all* its
//! dependencies are met sometime in the future.
//!
//! On the other hand, if a fragment is successfully evaluated, punted fragments that depend on it
//! will be evaluated eagerly (push) if all other dependencies have also been evaluated.
//!
//! This architecture has three major advantages:
//!
//! - It is lazy. Only fragments that are explicitly requested to be evaluated, and the fragments
//!   those depend on, will be evaluated. And never more than once.
//! - There is no need to explicitly detect nor handle cycles, unlike both pure push and pure
//!   pull. Fragments that are part of cycles will naturally be punted and never considered again.
//!   Unless the cycle is explicitly broken with [`Solver::assume_evaluated`] or
//!   [`Solver::clone_with_evaluation_assumptions`].

#![cfg_attr(not(feature = "std"), no_std)]

macro_rules! feature_cfg {
    (for $name:literal; $($item:item)*) => {
        $(
            #[cfg(feature = $name)]
            $item
        )*
    };
    (for !$name:literal; $($item:item)*) => {
        $(
            #[cfg(not(feature = $name))]
            $item
        )*
    };
}

use async_trait::async_trait;
use derive_more::{From, Into};
use futures::stream::{FuturesUnordered, StreamExt};
use reexported::{iter, Box, Map, Mutex, NonZeroUsize, Set, Vec};

pub mod reexported;

#[cfg(test)]
mod test;

/// Trait implemented by objects that define a specific problem to be solved by the [`Solver`].
///
/// Use [`mod@async_trait`] to implement this trait.
#[async_trait]
pub trait Problem {
    /// Error type for [`Problem::evaluate`].
    type Error;

    /// Fill `dependencies` with the direct dependencies of `id`. The output vector is guaranteed
    /// to be empty when this method is called.
    async fn direct_dependencies(
        &self,
        id: FragmentId,
        dependecies: &mut Vec<FragmentId>,
    );

    /// Called by the solver to signal that a fragment has had all of its dependencies evaluated
    /// and thus the fragment should be evaluated too.
    ///
    /// See [`Solver::run`] and [`Solver::step`] on how evaluation failures are handled.
    ///
    /// This method is never called more than once with the same fragment.
    async fn evaluate(&self, id: FragmentId) -> Result<(), Self::Error>;
}

/// ID of a fragment.
// TODO: allow `Problem` implementors to define their own ID type
#[derive(
    Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, From, Into,
)]
pub struct FragmentId(pub usize);

/// Hybrid push-pull solver.
pub struct Solver<P> {
    state: Mutex<State>,
    // This is a scratch vector we store here to reduce allocations
    dependencies: Mutex<Vec<FragmentId>>,
    problem_instance: P,
}

// POD struct
struct State {
    // TODO: these should be an intrusive copy-on-write to make cloning and testing alternatives
    // cheap
    to_solve: Set<FragmentId>,
    pending_on: Map<FragmentId, Vec<FragmentId>>,
    punted: Map<FragmentId, usize>,
    solved: Set<FragmentId>,
}

impl<P> Solver<P> {
    /// Create a new [`Solver`] instance for a [`Problem`] instance.
    pub fn new(problem_instance: P) -> Self {
        Self {
            state: Mutex::new(State {
                to_solve: Set::new(),
                pending_on: Map::new(),
                punted: Map::new(),
                solved: Set::new(),
            }),
            dependencies: Mutex::new(Vec::new()),
            problem_instance,
        }
    }

    /// Consume `self` and return the wrapped [`Problem`] instance.
    pub fn into_problem_instance(self) -> P {
        self.problem_instance
    }

    /// Get the current [`Status`] of the solver.
    pub async fn status(&self) -> Status {
        let state = self.state.lock().await;

        if state.to_solve.is_empty() {
            if state.punted.is_empty() {
                Status::Done
            } else {
                Status::DoneWithCycles
            }
        } else {
            Status::Pending
        }
    }

    /// Enqueue a fragment to be solved.
    ///
    /// Only fragments enqueued through this method and their transitive dependencies will be
    /// considered for evaluation.
    pub async fn enqueue_fragment(&self, id: FragmentId) -> &Self {
        self.state.lock().await.to_solve.insert(id);

        self
    }

    /// Get an interator to all fragments that are currently punted. Interpretation of punted
    /// fragments depends on the current [status](Solver::status):
    ///
    /// - [`Status::Pending`]: fragments are pending on dependencies.
    /// - [`Status::DoneWithCycles`]: fragments are part of one or more cycles.
    /// - [`Status::Done`]: the returned iterator will be empty.
    pub async fn punted_iter(&self) -> Vec<FragmentId> {
        self.state.lock().await.punted.keys().copied().collect()
    }
}

impl<P> Solver<P>
where
    P: Problem,
{
    /// Assume the given fragment is already evaluated.
    pub async fn assume_evaluated(&self, id: FragmentId) -> &Self {
        self.mark_solved(id, &mut *self.state.lock().await);

        self
    }

    /* TODO: rethink about cloning in general
    /// Create a clone of `self` that assumes some fragments are already evaluated.
    ///
    /// This method is useful for trying out assumptions that may need to be discarted.
    pub async fn clone_with_evaluation_assumptions<A>(
        &self,
        assume_evaluated: A,
    ) -> Self
    where
        A: IntoIterator<Item = FragmentId>,
        P: Clone,
    {
        let clone = self.clone();
        for id in assume_evaluated {
            clone.assume_evaluated(id).await;
        }

        clone
    }
    */

    /// Run the solver until all enqueued fragments and their transitive dependencies are either
    /// solved or proven to be part of cycles. See the module docs for the limitations when
    /// `concurrency > 1`.
    ///
    /// Returns an interator with all fragments that are part of at least one cycle, if any. See
    /// [`Solver::punted_iter`].
    ///
    /// Returns an error if any evaluation returns an error.
    ///
    /// # Known Issues
    ///
    /// - If [`Solver::enqueue_fragment`] is called while [`Solver::run`] is executing, those new
    ///   fragments may not be solved.
    /// - If [`Solver::run`] returns with an error, the [`Solver`] may be left in an inconsistent
    ///   state.
    pub async fn run(
        &self,
        concurrency: NonZeroUsize,
    ) -> Result<Vec<FragmentId>, P::Error> {
        let mut steps = iter::repeat_with(|| self.step())
            .take(concurrency.into())
            .collect::<FuturesUnordered<_>>();
        loop {
            // Run a `parallelism` number of `step`s until one of them errors out or we evaluate
            // all fragments
            match steps.next().await.unwrap() {
                Ok(false) => break,
                Ok(true) => steps.push(self.step()),
                Err(err) => return Err(err),
            }
        }
        while let Some(res) = steps.next().await {
            // Make sure all pending `step`s are evaluated to completion
            if let Err(err) = res {
                return Err(err);
            }
        }

        Ok(self.punted_iter().await)
    }

    /// Run a single solver step for a single fragment.
    ///
    /// Returns `false` if there are no more fragments that can be evaluated.
    ///
    /// Returns an error if [`Problem::evaluate`] was called and evaluation returned an error.
    ///
    /// # Known Issues
    ///
    /// - If [`Solver::step`] is not run to completion the [`Solver`] may be left in an
    ///   inconsistent state.
    pub async fn step(&self) -> Result<bool, P::Error> {
        let item = {
            let mut state = self.state.lock().await;

            state
                .to_solve
                .iter()
                .next()
                .copied()
                .map(|x| state.to_solve.take(&x).unwrap())
        };

        match item {
            Some(id) => {
                let mut dependencies = self.dependencies.lock().await;
                dependencies.clear();
                self.problem_instance
                    .direct_dependencies(id, &mut dependencies)
                    .await;
                let mut state = self.state.lock().await;
                dependencies.retain(|x| !state.solved.contains(x));

                if dependencies.is_empty() {
                    // Drop all locks before calling `evaluate`to allow other calls to `step` to
                    // progress while `evaluate` is running. And we only need to lock `self.state`
                    // again if `evaluate` is successful
                    drop(dependencies);
                    drop(state);

                    match self.problem_instance.evaluate(id).await {
                        Ok(()) => {
                            // TODO: take a deeper look here to make sure there are no possible
                            // race condition between dropping the state lock and locking it again
                            // here
                            self.mark_solved(id, &mut *self.state.lock().await);

                            Ok(true)
                        }
                        Err(err) => Err(err),
                    }
                } else {
                    self.mark_punted(id, &dependencies, &mut state);

                    Ok(true)
                }
            }
            None => Ok(false),
        }
    }

    fn mark_solved(&self, id: FragmentId, state: &mut State) {
        state.solved.insert(id);

        if let Some(dependents) = state.pending_on.remove(&id) {
            for dependent in dependents {
                if *state.punted.get(&dependent).unwrap() == 1 {
                    state.punted.remove(&dependent);
                    state.to_solve.insert(dependent);
                } else {
                    *state.punted.get_mut(&dependent).unwrap() -= 1;
                }
            }
        }
    }

    fn mark_punted(
        &self,
        id: FragmentId,
        dependencies: &[FragmentId],
        state: &mut State,
    ) {
        state.punted.insert(id, dependencies.len());

        for dependency in dependencies.iter().copied() {
            if dependency != id
                && !state.solved.contains(&dependency)
                && !state.punted.contains_key(&dependency)
            {
                state.to_solve.insert(dependency);
            }
            state.pending_on.entry(dependency).or_default().push(id);
        }
    }
}

/// Current status of a [`Solver`] instance.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Status {
    /// All fragments have been successfully evaluated.
    Done,

    /// All fragments that could be evaluated were evaluated, but there are still some that were
    /// not due to being part of one or more dependency cycles.
    DoneWithCycles,

    /// The solver is still running and there are still fragments that may be evaluated.
    Pending,
}