lb_rs/service/
activity.rs1use crate::model::errors::LbResult;
2use crate::model::tree_like::TreeLike;
3use crate::Lb;
4use serde::Deserialize;
5use serde::Serialize;
6use std::cmp;
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use uuid::Uuid;
10
11impl Lb {
12 #[instrument(level = "debug", skip(self), err(Debug))]
13 pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
14 let db = self.ro_tx().await;
15 let db = db.db();
16
17 let mut scores = db.doc_events.get().iter().get_activity_metrics();
18 self.normalize(&mut scores);
19
20 scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
21
22 scores.truncate(10);
23
24 let mut result = Vec::new();
25 let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
26 for score in scores {
27 if tree.maybe_find(&score.id).is_none() {
28 continue;
29 }
30
31 if tree.calculate_deleted(&score.id)? {
32 continue;
33 }
34 if tree.in_pending_share(&score.id)? {
35 continue;
36 }
37 result.push(score.id);
38 }
39
40 Ok(result)
41 }
42
43 pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
44 let mut tx = self.begin_tx().await;
45 let db = tx.db();
46
47 let max_stored_events = 1000;
48 let events = &db.doc_events;
49
50 if events.get().len() > max_stored_events {
51 db.doc_events.remove(0)?;
52 }
53 db.doc_events.push(event)?;
54 Ok(())
55 }
56
57 pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
58 let read_count_range = StatisticValueRange {
59 max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
60 min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
61 };
62
63 let write_count_range = StatisticValueRange {
64 max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
65 min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
66 };
67
68 let last_read_range = StatisticValueRange {
69 max: docs
70 .iter()
71 .map(|f| f.last_read_timestamp)
72 .max()
73 .unwrap_or_default(),
74 min: docs
75 .iter()
76 .map(|f| f.last_read_timestamp)
77 .min()
78 .unwrap_or_default(),
79 };
80 let last_write_range = StatisticValueRange {
81 max: docs
82 .iter()
83 .map(|f| f.last_write_timestamp)
84 .max()
85 .unwrap_or_default(),
86 min: docs
87 .iter()
88 .map(|f| f.last_write_timestamp)
89 .min()
90 .unwrap_or_default(),
91 };
92
93 docs.iter_mut().for_each(|f| {
94 f.read_count.normalize(read_count_range);
95 f.write_count.normalize(write_count_range);
96 f.last_read_timestamp.normalize(last_read_range);
97 f.last_write_timestamp.normalize(last_write_range);
98 });
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash)]
103pub enum DocEvent {
104 Read(Uuid, i64),
105 Write(Uuid, i64),
106}
107impl DocEvent {
108 pub fn timestamp(&self) -> i64 {
109 match *self {
110 DocEvent::Read(_, timestamp) => timestamp,
111 DocEvent::Write(_, timestamp) => timestamp,
112 }
113 }
114 pub fn id(&self) -> Uuid {
115 match *self {
116 DocEvent::Read(id, _) => id,
117 DocEvent::Write(id, _) => id,
118 }
119 }
120}
121
122#[derive(Debug, Copy, Clone)]
123pub struct RankingWeights {
124 pub temporality: i64,
126 pub io: i64,
128}
129
130impl Default for RankingWeights {
131 fn default() -> Self {
132 Self { temporality: 60, io: 40 }
133 }
134}
135#[derive(Default, Copy, Clone, PartialEq)]
136pub struct StatisticValue {
137 pub raw: i64,
138 pub normalized: Option<f64>,
139}
140
141impl Ord for StatisticValue {
142 fn cmp(&self, other: &Self) -> Ordering {
143 (self.raw).cmp(&other.raw)
144 }
145}
146
147impl PartialOrd for StatisticValue {
148 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
149 Some(self.cmp(other))
150 }
151}
152
153impl Eq for StatisticValue {}
154
155#[derive(Clone, Copy)]
156pub struct StatisticValueRange {
157 pub max: StatisticValue,
158 pub min: StatisticValue,
159}
160impl StatisticValue {
161 pub fn normalize(&mut self, range: StatisticValueRange) {
162 let mut range_distance = range.max.raw - range.min.raw;
163 if range_distance == 0 {
164 range_distance = 1
165 };
166 let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
167 self.normalized = Some(normalized);
168 }
169}
170#[derive(Default, Copy, Clone, PartialEq)]
173pub struct DocActivityMetrics {
174 pub id: Uuid,
175 pub last_read_timestamp: StatisticValue,
177 pub last_write_timestamp: StatisticValue,
179 pub read_count: StatisticValue,
181 pub write_count: StatisticValue,
183}
184
185impl DocActivityMetrics {
186 pub fn score(&self, weights: RankingWeights) -> i64 {
187 let timestamp_weight = weights.temporality;
188 let io_count_weight = weights.io;
189
190 let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
191 + self.last_write_timestamp.normalized.unwrap_or_default())
192 * timestamp_weight as f64;
193
194 let io_score = (self.read_count.normalized.unwrap_or_default()
195 + self.write_count.normalized.unwrap_or_default())
196 * io_count_weight as f64;
197
198 (io_score + temporality_score).ceil() as i64
199 }
200}
201pub trait Stats {
202 fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
203}
204impl<'a, T> Stats for T
205where
206 T: Iterator<Item = &'a DocEvent>,
207{
208 fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
209 let mut result = Vec::new();
210
211 let mut set = HashMap::new();
212 for event in self {
213 match set.get_mut(&event.id()) {
214 None => {
215 set.insert(event.id(), vec![event]);
216 }
217 Some(events) => {
218 events.push(event);
219 }
220 }
221 }
222
223 for (id, events) in set {
224 let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
225
226 let last_read = read_events
227 .clone()
228 .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
229
230 let last_read = match last_read {
231 None => 0,
232 Some(x) => x.timestamp(),
233 };
234
235 let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
236
237 let last_write = write_events
238 .clone()
239 .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
240 let last_write = match last_write {
241 None => 0,
242 Some(x) => x.timestamp(),
243 };
244
245 let metrics = DocActivityMetrics {
246 id,
247 last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
248 last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
249 read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
250 write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
251 };
252 result.push(metrics);
253 }
254
255 result
256 }
257}