super_mass/
lib.rs

1//!
2//!MASS: Mueen's Algorithm for Similarity Search in Rust!
3
4//! > Similarity search for time series subsequences is THE most important subroutine for time series pattern mining. Subsequence similarity search has been scaled to trillions obsetvations under both DTW (Dynamic Time Warping) and Euclidean distances \[a\]. The algorithms are ultra fast and efficient. The key technique that makes the algorithms useful is the Early Abandoning technique \[b,e\] known since 1994. However, the algorithms lack few properties that are useful for many time series data mining algorithms.
5
6//! > 1. Early abandoning depends on the dataset. The worst case complexity is still O(nm) where n is the length of the larger time series and m is the length of the short query.
7//! > 2. The algorithm can produce the most similar subsequence to the query and cannot produce the Distance Profile to all the subssequences given the query.
8//! > MASS is an algorithm to create Distance Profile of a query to a long time series. In this page we share a code for The Fastest Similarity Search Algorithm for Time Series Subsequences under Euclidean Distance. Early abandoning can occasionally beat this algorithm on some datasets for some queries. This algorithm is independent of data and query. The underlying concept of the algorithm is known for a long time to the signal processing community. We have used it for the first time on time series subsequence search under z-normalization. The algorithm was used as a subroutine in our papers \[c,d\] and the code are given below.
9//!
10//! > 1. The algorithm has an overall time complexity of O(n log n) which does not depend on datasets and is the lower bound of similarity search over time series subsequences.
11//! > 2. The algorithm produces all of the distances from the query to the subsequences of a long time series. In our recent paper, we generalize the usage of the distance profiles calculated using MASS in finding motifs, shapelets and discords.
12//!
13//! Excerpt taken from:
14
15//!```markdown
16//!@misc{
17//!FastestSimilaritySearch,
18//!title={The Fastest Similarity Search Algorithm for Time Series Subsequences under Euclidean Distance},
19//!author={ Mueen, Abdullah and Zhu, Yan and Yeh, Michael and Kamgar, Kaveh and Viswanathan, Krishnamurthy and Gupta, Chetan and Keogh, Eamonn},
20//!year={2017},
21//!month={August},
22//!note = {\url{http://www.cs.unm.edu/~mueen/FastestSimilaritySearch.html}}
23//!}
24//!```
25
26//!
27//!## Features
28//!
29//!`"jemalloc"` enable jemallocator as memory allocator.
30//!
31//!`"pseudo_distance"` simplifies the distance with the same optimization goal for increased performance.
32//!The distance output is no longer the MASS distance but a score with the same optimum.
33//!
34//!`"auto"` uses all logical cores to parallelize batch functions. Enabled by default. Disabling this feature exposes ['init_pool()`] to init the global thread pool.
35//!
36//! ## Panics
37//! TODO
38//! ## Examples
39
40//!```
41//!use rand::{thread_rng, Rng};
42//!
43//!let mut rng = thread_rng();
44//!let ts = (0..10_000).map(|_| rng.gen()).collect::<Vec<f64>>();
45//!let query = (0..500).map(|_| rng.gen()).collect::<Vec<f64>>();
46//!let res = super_mass::mass_batch(&ts[..], &query[..], 501, 3);
47//! //top_matches (only the best per batch considered) tuples of (index,distance score).
48//!dbg!(res);
49//!```
50
51//!```
52//!use rand::{thread_rng, Rng};
53//!
54//!let mut rng = thread_rng();
55//!let ts = (0..10_000).map(|_| rng.gen()).collect::<Vec<f64>>();
56//!let query = (0..500).map(|_| rng.gen()).collect::<Vec<f64>>();
57//!let res = super_mass::mass(&ts[..], &query[..]);
58//! //Complete distance profile
59//!dbg!(res);
60//!```
61
62#[cfg(all(not(target_env = "msvc"), feature = "jemallocator"))]
63use jemallocator::Jemalloc;
64
65#[cfg(all(not(target_env = "msvc"), feature = "jemallocator"))]
66#[global_allocator]
67static GLOBAL: Jemalloc = Jemalloc;
68
69use std::fmt::Debug;
70
71use rayon::iter::ParallelBridge;
72use rayon::prelude::*;
73use std::ops;
74
75#[cfg(not(feature = "auto"))]
76use num_cpus;
77pub mod math;
78pub mod stats;
79
80pub mod time_series;
81use math::argmin;
82use math::fft_mult;
83use stats::{mean, moving_avg as ma, moving_std as mstd, std};
84
85pub trait MassType:
86    PartialOrd + From<f64> + Into<f64> + Copy + ops::Add<f64> + Debug + Default + Sync
87{
88}
89
90/// compute the MASS distance and return the index and value of the minimum found.
91fn min_subsequence_distance<T>(start_idx: usize, subsequence: &[T], query: &[T]) -> (usize, f64)
92where
93    T: MassType,
94{
95    let distances = mass(subsequence, query);
96
97    //  find mininimum index of this batch which will be between 0 and batch_size
98    let min_idx = argmin(&distances);
99
100    // add the minimum distance found to the best distances
101    let dist = distances[min_idx];
102
103    // compute the global index
104    let index = min_idx + start_idx;
105
106    return (index, dist);
107}
108
109/// Compute the distance profile for the given query over the given time
110/// series.
111pub fn mass<T: Debug + Default>(ts: &[T], query: &[T]) -> Vec<f64>
112where
113    T: MassType,
114{
115    let n = ts.len();
116    let m = query.len();
117
118    debug_assert!(n >= m);
119
120    // mu and sigma for query
121    let mu_q = mean(query);
122    let sigma_q = std(query);
123
124    // Rolling mean and std for the time series
125    let rolling_mean_ts = ma(ts, m);
126
127    let rolling_sigma_ts = mstd(ts, m);
128
129    let z = fft_mult(&ts, &query);
130
131    let dist = math::dist(
132        mu_q,
133        sigma_q,
134        rolling_mean_ts,
135        rolling_sigma_ts,
136        n,
137        m,
138        &z[..],
139    );
140    dist
141}
142
143// need to try whether chunks over logical is faster than over physical cores SMT!!
144#[cfg(not(feature = "auto"))]
145fn cpus() -> usize {
146    num_cpus::get()
147}
148
149#[cfg(not(feature = "auto"))]
150use std::sync::Once;
151
152#[cfg(not(feature = "auto"))]
153static JOBS_SET: Once = Once::new();
154
155// Init global pool with [`jobs`] threads.
156#[cfg(not(feature = "auto"))]
157fn start_pool(jobs: usize) {
158    assert!(jobs > 0, "Job count must be at least 1.");
159    // silently use at max all available logical cpus
160    let jobs = jobs.min(cpus());
161    rayon::ThreadPoolBuilder::new()
162        .num_threads(jobs)
163        .build_global()
164        .unwrap();
165}
166
167// Initialize the threadpool with [`threads`] threads. This method will take effect once and
168//  must be called before the first call to [`mass_batch`]. Once the pool has been instantiated the threadpool is final.
169// The limitation on the global threadpool being final comes from the ['rayon'] dependency and is subject to change.
170#[cfg(not(feature = "auto"))]
171pub fn init_pool(threads: usize) {
172    JOBS_SET.call_once(|| start_pool(threads));
173}
174/// Masss batch finds top subsequence per batch the lowest distance profile for a given `query` and returns the top K subsequences.
175/// This behavior is useful when you want to filter adjacent suboptimal subsequences in each batch,
176/// where the local optimum overlaps with suboptima differing only by a few index strides.
177/// This method implements MASS V3 where chunks are split in powers of two and computed in parallel.
178/// Results are partitioned and not sorted, you can sort them afterwards if needed.
179pub fn mass_batch<T: MassType>(
180    ts: &[T],
181    query: &[T],
182    batch_size: usize,
183    top_matches: usize,
184) -> Vec<(usize, f64)> {
185    debug_assert!(batch_size > 0, "batch_size must be greater than 0.");
186    debug_assert!(top_matches > 0, "Match at least one.");
187
188    // TODO support nth top matches in parallel
189    // consider doing full nth top matches with a partition pseudosort per thread to ensure global optima.
190    let mut dists: Vec<_> = task_index(ts.len(), query.len(), batch_size)
191        .into_iter()
192        .par_bridge()
193        .map(|(l, h)| min_subsequence_distance(l, &ts[l..=h], query))
194        .collect();
195
196    assert!(
197        dists.len() >= top_matches,
198        format!(
199            "top_matches [{}] must be less or equal than the total batch count [{}], choose a smaller batch_size or less top_matches ",
200            top_matches,
201            dists.len()
202        )
203    );
204    dists.select_nth_unstable_by(top_matches - 1, |x, y| x.1.partial_cmp(&(y.1)).unwrap());
205
206    dists.iter().take(top_matches).copied().collect()
207}
208
209/// Generate the index for time series slices of size batch size; Batch size may be rounded to the nearest power of two.
210/// Rounding to the nearest power of two may panic! if the new batch size is greater than the time series' length.
211#[inline]
212fn task_index(
213    ts: usize,
214    query: usize,
215    mut batch_size: usize,
216) -> impl Iterator<Item = (usize, usize)> {
217    assert!(
218        batch_size > query,
219        "batch size must be greater than the query's length"
220    );
221
222    if !batch_size.is_power_of_two() {
223        batch_size = batch_size.next_power_of_two();
224    }
225
226    debug_assert!(
227        batch_size <= ts,
228        "batchsize after next power of two must be less or equal than series' length"
229    );
230    debug_assert!(
231        batch_size >= query,
232        "batchsize after next power of two must be greater or equal than query's length"
233    );
234
235    let step_size = batch_size - (query - 1);
236
237    let index = (0..ts - query)
238        .step_by(step_size)
239        .map(move |i| (i, (ts - 1).min(i + batch_size - 1)));
240    index
241}
242
243#[cfg(test)]
244pub mod tests {
245
246    use super::*;
247
248    #[test]
249    fn usize_div() {
250        assert_eq!(5usize / 2usize, 2);
251    }
252
253    // must run before any other call to [`mass_batch`] for it to pass. See [`init_pool`].
254    #[test]
255    #[cfg(not(feature = "auto"))]
256    fn init_tpool() {
257        let t = 4;
258        init_pool(t);
259        assert!(rayon::current_num_threads() == t);
260    }
261
262    #[test]
263    #[ignore = "for manual inspection purposes"]
264    fn jobs_range_0() {
265        let a = task_index(6, 2, 4);
266        for i in a {
267            print!("{:?}\n", i);
268        }
269    }
270
271    #[test]
272    fn jobs_range_1() {
273        let mut a = task_index(10, 4, 5);
274        assert!(a.next().unwrap() == (0, 7));
275        assert!(a.next().unwrap() == (5, 9));
276        assert!(a.next() == None);
277    }
278
279    #[test]
280    fn jobs_range_2() {
281        let mut a = task_index(6, 2, 4);
282        assert!(a.next().unwrap() == (0, 3));
283        assert!(a.next().unwrap() == (3, 5));
284        assert!(a.next() == None);
285    }
286
287    #[test]
288    fn jobs_range_3() {
289        let mut a = task_index(8, 2, 8);
290        assert!(a.next().unwrap() == (0, 7));
291        assert!(a.next() == None);
292    }
293    #[test]
294    fn jobs_range_4() {
295        let mut a = task_index(6, 3, 4);
296
297        assert!(a.next().unwrap() == (0, 3));
298        assert!(a.next().unwrap() == (2, 5));
299        assert!(a.next() == None);
300    }
301
302    #[test]
303    fn integration_1() {
304        let a = &[10., 3., 2., 3., 4.5, 6., 0., -1.];
305        let b = &[2., 3.];
306        let bsize = 4;
307        let c = mass_batch(a, b, bsize, 2);
308        println!("{:?}", c);
309        assert!(c[0].0 == 3);
310    }
311
312    #[test]
313    fn integration_2() {
314        let a = &[0., 10., 20., 30., 50., 10.];
315        let b = &[2., 3., 2.];
316        let c = mass_batch(a, b, 4, 1);
317        assert!(c[0].0 == 3);
318    }
319
320    //[´jobs´] greater that logical cores
321    #[test]
322    fn integration_3() {
323        let a = &[0., 10., 20., 30., 50., 10.];
324        let b = &[2., 3., 2.];
325        let c = mass_batch(a, b, 4, 1);
326        assert!(c[0].0 == 3);
327    }
328}