super_mass/lib.rs
1//!
2//!MASS: Mueen's Algorithm for Similarity Search in Rust!
3
4//! > Similarity search for time series subsequences is THE most important subroutine for time series pattern mining. Subsequence similarity search has been scaled to trillions obsetvations under both DTW (Dynamic Time Warping) and Euclidean distances \[a\]. The algorithms are ultra fast and efficient. The key technique that makes the algorithms useful is the Early Abandoning technique \[b,e\] known since 1994. However, the algorithms lack few properties that are useful for many time series data mining algorithms.
5
6//! > 1. Early abandoning depends on the dataset. The worst case complexity is still O(nm) where n is the length of the larger time series and m is the length of the short query.
7//! > 2. The algorithm can produce the most similar subsequence to the query and cannot produce the Distance Profile to all the subssequences given the query.
8//! > MASS is an algorithm to create Distance Profile of a query to a long time series. In this page we share a code for The Fastest Similarity Search Algorithm for Time Series Subsequences under Euclidean Distance. Early abandoning can occasionally beat this algorithm on some datasets for some queries. This algorithm is independent of data and query. The underlying concept of the algorithm is known for a long time to the signal processing community. We have used it for the first time on time series subsequence search under z-normalization. The algorithm was used as a subroutine in our papers \[c,d\] and the code are given below.
9//!
10//! > 1. The algorithm has an overall time complexity of O(n log n) which does not depend on datasets and is the lower bound of similarity search over time series subsequences.
11//! > 2. The algorithm produces all of the distances from the query to the subsequences of a long time series. In our recent paper, we generalize the usage of the distance profiles calculated using MASS in finding motifs, shapelets and discords.
12//!
13//! Excerpt taken from:
14
15//!```markdown
16//!@misc{
17//!FastestSimilaritySearch,
18//!title={The Fastest Similarity Search Algorithm for Time Series Subsequences under Euclidean Distance},
19//!author={ Mueen, Abdullah and Zhu, Yan and Yeh, Michael and Kamgar, Kaveh and Viswanathan, Krishnamurthy and Gupta, Chetan and Keogh, Eamonn},
20//!year={2017},
21//!month={August},
22//!note = {\url{http://www.cs.unm.edu/~mueen/FastestSimilaritySearch.html}}
23//!}
24//!```
25
26//!
27//!## Features
28//!
29//!`"jemalloc"` enable jemallocator as memory allocator.
30//!
31//!`"pseudo_distance"` simplifies the distance with the same optimization goal for increased performance.
32//!The distance output is no longer the MASS distance but a score with the same optimum.
33//!
34//!`"auto"` uses all logical cores to parallelize batch functions. Enabled by default. Disabling this feature exposes ['init_pool()`] to init the global thread pool.
35//!
36//! ## Panics
37//! TODO
38//! ## Examples
39
40//!```
41//!use rand::{thread_rng, Rng};
42//!
43//!let mut rng = thread_rng();
44//!let ts = (0..10_000).map(|_| rng.gen()).collect::<Vec<f64>>();
45//!let query = (0..500).map(|_| rng.gen()).collect::<Vec<f64>>();
46//!let res = super_mass::mass_batch(&ts[..], &query[..], 501, 3);
47//! //top_matches (only the best per batch considered) tuples of (index,distance score).
48//!dbg!(res);
49//!```
50
51//!```
52//!use rand::{thread_rng, Rng};
53//!
54//!let mut rng = thread_rng();
55//!let ts = (0..10_000).map(|_| rng.gen()).collect::<Vec<f64>>();
56//!let query = (0..500).map(|_| rng.gen()).collect::<Vec<f64>>();
57//!let res = super_mass::mass(&ts[..], &query[..]);
58//! //Complete distance profile
59//!dbg!(res);
60//!```
61
62#[cfg(all(not(target_env = "msvc"), feature = "jemallocator"))]
63use jemallocator::Jemalloc;
64
65#[cfg(all(not(target_env = "msvc"), feature = "jemallocator"))]
66#[global_allocator]
67static GLOBAL: Jemalloc = Jemalloc;
68
69use std::fmt::Debug;
70
71use rayon::iter::ParallelBridge;
72use rayon::prelude::*;
73use std::ops;
74
75#[cfg(not(feature = "auto"))]
76use num_cpus;
77pub mod math;
78pub mod stats;
79
80pub mod time_series;
81use math::argmin;
82use math::fft_mult;
83use stats::{mean, moving_avg as ma, moving_std as mstd, std};
84
85pub trait MassType:
86 PartialOrd + From<f64> + Into<f64> + Copy + ops::Add<f64> + Debug + Default + Sync
87{
88}
89
90/// compute the MASS distance and return the index and value of the minimum found.
91fn min_subsequence_distance<T>(start_idx: usize, subsequence: &[T], query: &[T]) -> (usize, f64)
92where
93 T: MassType,
94{
95 let distances = mass(subsequence, query);
96
97 // find mininimum index of this batch which will be between 0 and batch_size
98 let min_idx = argmin(&distances);
99
100 // add the minimum distance found to the best distances
101 let dist = distances[min_idx];
102
103 // compute the global index
104 let index = min_idx + start_idx;
105
106 return (index, dist);
107}
108
109/// Compute the distance profile for the given query over the given time
110/// series.
111pub fn mass<T: Debug + Default>(ts: &[T], query: &[T]) -> Vec<f64>
112where
113 T: MassType,
114{
115 let n = ts.len();
116 let m = query.len();
117
118 debug_assert!(n >= m);
119
120 // mu and sigma for query
121 let mu_q = mean(query);
122 let sigma_q = std(query);
123
124 // Rolling mean and std for the time series
125 let rolling_mean_ts = ma(ts, m);
126
127 let rolling_sigma_ts = mstd(ts, m);
128
129 let z = fft_mult(&ts, &query);
130
131 let dist = math::dist(
132 mu_q,
133 sigma_q,
134 rolling_mean_ts,
135 rolling_sigma_ts,
136 n,
137 m,
138 &z[..],
139 );
140 dist
141}
142
143// need to try whether chunks over logical is faster than over physical cores SMT!!
144#[cfg(not(feature = "auto"))]
145fn cpus() -> usize {
146 num_cpus::get()
147}
148
149#[cfg(not(feature = "auto"))]
150use std::sync::Once;
151
152#[cfg(not(feature = "auto"))]
153static JOBS_SET: Once = Once::new();
154
155// Init global pool with [`jobs`] threads.
156#[cfg(not(feature = "auto"))]
157fn start_pool(jobs: usize) {
158 assert!(jobs > 0, "Job count must be at least 1.");
159 // silently use at max all available logical cpus
160 let jobs = jobs.min(cpus());
161 rayon::ThreadPoolBuilder::new()
162 .num_threads(jobs)
163 .build_global()
164 .unwrap();
165}
166
167// Initialize the threadpool with [`threads`] threads. This method will take effect once and
168// must be called before the first call to [`mass_batch`]. Once the pool has been instantiated the threadpool is final.
169// The limitation on the global threadpool being final comes from the ['rayon'] dependency and is subject to change.
170#[cfg(not(feature = "auto"))]
171pub fn init_pool(threads: usize) {
172 JOBS_SET.call_once(|| start_pool(threads));
173}
174/// Masss batch finds top subsequence per batch the lowest distance profile for a given `query` and returns the top K subsequences.
175/// This behavior is useful when you want to filter adjacent suboptimal subsequences in each batch,
176/// where the local optimum overlaps with suboptima differing only by a few index strides.
177/// This method implements MASS V3 where chunks are split in powers of two and computed in parallel.
178/// Results are partitioned and not sorted, you can sort them afterwards if needed.
179pub fn mass_batch<T: MassType>(
180 ts: &[T],
181 query: &[T],
182 batch_size: usize,
183 top_matches: usize,
184) -> Vec<(usize, f64)> {
185 debug_assert!(batch_size > 0, "batch_size must be greater than 0.");
186 debug_assert!(top_matches > 0, "Match at least one.");
187
188 // TODO support nth top matches in parallel
189 // consider doing full nth top matches with a partition pseudosort per thread to ensure global optima.
190 let mut dists: Vec<_> = task_index(ts.len(), query.len(), batch_size)
191 .into_iter()
192 .par_bridge()
193 .map(|(l, h)| min_subsequence_distance(l, &ts[l..=h], query))
194 .collect();
195
196 assert!(
197 dists.len() >= top_matches,
198 format!(
199 "top_matches [{}] must be less or equal than the total batch count [{}], choose a smaller batch_size or less top_matches ",
200 top_matches,
201 dists.len()
202 )
203 );
204 dists.select_nth_unstable_by(top_matches - 1, |x, y| x.1.partial_cmp(&(y.1)).unwrap());
205
206 dists.iter().take(top_matches).copied().collect()
207}
208
209/// Generate the index for time series slices of size batch size; Batch size may be rounded to the nearest power of two.
210/// Rounding to the nearest power of two may panic! if the new batch size is greater than the time series' length.
211#[inline]
212fn task_index(
213 ts: usize,
214 query: usize,
215 mut batch_size: usize,
216) -> impl Iterator<Item = (usize, usize)> {
217 assert!(
218 batch_size > query,
219 "batch size must be greater than the query's length"
220 );
221
222 if !batch_size.is_power_of_two() {
223 batch_size = batch_size.next_power_of_two();
224 }
225
226 debug_assert!(
227 batch_size <= ts,
228 "batchsize after next power of two must be less or equal than series' length"
229 );
230 debug_assert!(
231 batch_size >= query,
232 "batchsize after next power of two must be greater or equal than query's length"
233 );
234
235 let step_size = batch_size - (query - 1);
236
237 let index = (0..ts - query)
238 .step_by(step_size)
239 .map(move |i| (i, (ts - 1).min(i + batch_size - 1)));
240 index
241}
242
243#[cfg(test)]
244pub mod tests {
245
246 use super::*;
247
248 #[test]
249 fn usize_div() {
250 assert_eq!(5usize / 2usize, 2);
251 }
252
253 // must run before any other call to [`mass_batch`] for it to pass. See [`init_pool`].
254 #[test]
255 #[cfg(not(feature = "auto"))]
256 fn init_tpool() {
257 let t = 4;
258 init_pool(t);
259 assert!(rayon::current_num_threads() == t);
260 }
261
262 #[test]
263 #[ignore = "for manual inspection purposes"]
264 fn jobs_range_0() {
265 let a = task_index(6, 2, 4);
266 for i in a {
267 print!("{:?}\n", i);
268 }
269 }
270
271 #[test]
272 fn jobs_range_1() {
273 let mut a = task_index(10, 4, 5);
274 assert!(a.next().unwrap() == (0, 7));
275 assert!(a.next().unwrap() == (5, 9));
276 assert!(a.next() == None);
277 }
278
279 #[test]
280 fn jobs_range_2() {
281 let mut a = task_index(6, 2, 4);
282 assert!(a.next().unwrap() == (0, 3));
283 assert!(a.next().unwrap() == (3, 5));
284 assert!(a.next() == None);
285 }
286
287 #[test]
288 fn jobs_range_3() {
289 let mut a = task_index(8, 2, 8);
290 assert!(a.next().unwrap() == (0, 7));
291 assert!(a.next() == None);
292 }
293 #[test]
294 fn jobs_range_4() {
295 let mut a = task_index(6, 3, 4);
296
297 assert!(a.next().unwrap() == (0, 3));
298 assert!(a.next().unwrap() == (2, 5));
299 assert!(a.next() == None);
300 }
301
302 #[test]
303 fn integration_1() {
304 let a = &[10., 3., 2., 3., 4.5, 6., 0., -1.];
305 let b = &[2., 3.];
306 let bsize = 4;
307 let c = mass_batch(a, b, bsize, 2);
308 println!("{:?}", c);
309 assert!(c[0].0 == 3);
310 }
311
312 #[test]
313 fn integration_2() {
314 let a = &[0., 10., 20., 30., 50., 10.];
315 let b = &[2., 3., 2.];
316 let c = mass_batch(a, b, 4, 1);
317 assert!(c[0].0 == 3);
318 }
319
320 //[´jobs´] greater that logical cores
321 #[test]
322 fn integration_3() {
323 let a = &[0., 10., 20., 30., 50., 10.];
324 let b = &[2., 3., 2.];
325 let c = mass_batch(a, b, 4, 1);
326 assert!(c[0].0 == 3);
327 }
328}