1use crate::model::errors::LbResult;
2use crate::model::tree_like::TreeLike;
3use crate::Lb;
4use serde::Deserialize;
5use serde::Serialize;
6use std::cmp;
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use uuid::Uuid;
10
11impl Lb {
12 #[instrument(level = "debug", skip(self), err(Debug))]
13 pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
14 let db = self.ro_tx().await;
15 let db = db.db();
16
17 let mut scores = db.doc_events.get().iter().get_activity_metrics();
18 self.normalize(&mut scores);
19
20 scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
21
22 scores.truncate(10);
23
24 let mut result = Vec::new();
25 let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
26 for score in scores {
27 if tree.maybe_find(&score.id).is_none() {
28 continue;
29 }
30
31 if tree.calculate_deleted(&score.id)? {
32 continue;
33 }
34 if tree.in_pending_share(&score.id)? {
35 continue;
36 }
37 result.push(score.id);
38 }
39
40 Ok(result)
41 }
42
43 #[instrument(level = "debug", skip(self), err(Debug))]
44 pub async fn clear_suggested(&self) -> LbResult<()> {
45 let mut tx = self.begin_tx().await;
46 let db = tx.db();
47 db.doc_events.clear()?;
48 Ok(())
49 }
50
51 #[instrument(level = "debug", skip(self), err(Debug))]
52 pub async fn clear_suggested_id(&self, id: Uuid) -> LbResult<()> {
53 let mut tx = self.begin_tx().await;
54 let db = tx.db();
55
56 let mut entries = db.doc_events.get().to_vec();
57 db.doc_events.clear()?;
58 entries.retain(|e| e.id() != id);
59 for entry in entries {
60 db.doc_events.push(entry)?;
61 }
62
63 Ok(())
64 }
65
66 pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
67 let mut tx = self.begin_tx().await;
68 let db = tx.db();
69
70 let max_stored_events = 1000;
71 let events = &db.doc_events;
72
73 if events.get().len() > max_stored_events {
74 db.doc_events.remove(0)?;
75 }
76 db.doc_events.push(event)?;
77 Ok(())
78 }
79
80 pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
81 let read_count_range = StatisticValueRange {
82 max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
83 min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
84 };
85
86 let write_count_range = StatisticValueRange {
87 max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
88 min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
89 };
90
91 let last_read_range = StatisticValueRange {
92 max: docs
93 .iter()
94 .map(|f| f.last_read_timestamp)
95 .max()
96 .unwrap_or_default(),
97 min: docs
98 .iter()
99 .map(|f| f.last_read_timestamp)
100 .min()
101 .unwrap_or_default(),
102 };
103 let last_write_range = StatisticValueRange {
104 max: docs
105 .iter()
106 .map(|f| f.last_write_timestamp)
107 .max()
108 .unwrap_or_default(),
109 min: docs
110 .iter()
111 .map(|f| f.last_write_timestamp)
112 .min()
113 .unwrap_or_default(),
114 };
115
116 docs.iter_mut().for_each(|f| {
117 f.read_count.normalize(read_count_range);
118 f.write_count.normalize(write_count_range);
119 f.last_read_timestamp.normalize(last_read_range);
120 f.last_write_timestamp.normalize(last_write_range);
121 });
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash)]
126pub enum DocEvent {
127 Read(Uuid, i64),
128 Write(Uuid, i64),
129}
130impl DocEvent {
131 pub fn timestamp(&self) -> i64 {
132 match *self {
133 DocEvent::Read(_, timestamp) => timestamp,
134 DocEvent::Write(_, timestamp) => timestamp,
135 }
136 }
137 pub fn id(&self) -> Uuid {
138 match *self {
139 DocEvent::Read(id, _) => id,
140 DocEvent::Write(id, _) => id,
141 }
142 }
143}
144
145#[derive(Debug, Copy, Clone)]
146pub struct RankingWeights {
147 pub temporality: i64,
149 pub io: i64,
151}
152
153impl Default for RankingWeights {
154 fn default() -> Self {
155 Self { temporality: 60, io: 40 }
156 }
157}
158#[derive(Default, Copy, Clone, PartialEq)]
159pub struct StatisticValue {
160 pub raw: i64,
161 pub normalized: Option<f64>,
162}
163
164impl Ord for StatisticValue {
165 fn cmp(&self, other: &Self) -> Ordering {
166 (self.raw).cmp(&other.raw)
167 }
168}
169
170impl PartialOrd for StatisticValue {
171 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
172 Some(self.cmp(other))
173 }
174}
175
176impl Eq for StatisticValue {}
177
178#[derive(Clone, Copy)]
179pub struct StatisticValueRange {
180 pub max: StatisticValue,
181 pub min: StatisticValue,
182}
183impl StatisticValue {
184 pub fn normalize(&mut self, range: StatisticValueRange) {
185 let mut range_distance = range.max.raw - range.min.raw;
186 if range_distance == 0 {
187 range_distance = 1
188 };
189 let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
190 self.normalized = Some(normalized);
191 }
192}
193#[derive(Default, Copy, Clone, PartialEq)]
196pub struct DocActivityMetrics {
197 pub id: Uuid,
198 pub last_read_timestamp: StatisticValue,
200 pub last_write_timestamp: StatisticValue,
202 pub read_count: StatisticValue,
204 pub write_count: StatisticValue,
206}
207
208impl DocActivityMetrics {
209 pub fn score(&self, weights: RankingWeights) -> i64 {
210 let timestamp_weight = weights.temporality;
211 let io_count_weight = weights.io;
212
213 let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
214 + self.last_write_timestamp.normalized.unwrap_or_default())
215 * timestamp_weight as f64;
216
217 let io_score = (self.read_count.normalized.unwrap_or_default()
218 + self.write_count.normalized.unwrap_or_default())
219 * io_count_weight as f64;
220
221 (io_score + temporality_score).ceil() as i64
222 }
223}
224pub trait Stats {
225 fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
226}
227impl<'a, T> Stats for T
228where
229 T: Iterator<Item = &'a DocEvent>,
230{
231 fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
232 let mut result = Vec::new();
233
234 let mut set = HashMap::new();
235 for event in self {
236 match set.get_mut(&event.id()) {
237 None => {
238 set.insert(event.id(), vec![event]);
239 }
240 Some(events) => {
241 events.push(event);
242 }
243 }
244 }
245
246 for (id, events) in set {
247 let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
248
249 let last_read = read_events
250 .clone()
251 .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
252
253 let last_read = match last_read {
254 None => 0,
255 Some(x) => x.timestamp(),
256 };
257
258 let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
259
260 let last_write = write_events
261 .clone()
262 .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
263 let last_write = match last_write {
264 None => 0,
265 Some(x) => x.timestamp(),
266 };
267
268 let metrics = DocActivityMetrics {
269 id,
270 last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
271 last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
272 read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
273 write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
274 };
275 result.push(metrics);
276 }
277
278 result
279 }
280}