lb_rs/service/
activity.rs

1use crate::model::errors::LbResult;
2use crate::model::tree_like::TreeLike;
3use crate::Lb;
4use serde::Deserialize;
5use serde::Serialize;
6use std::cmp;
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use uuid::Uuid;
10
11impl Lb {
12    #[instrument(level = "debug", skip(self), err(Debug))]
13    pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
14        let db = self.ro_tx().await;
15        let db = db.db();
16
17        let mut scores = db.doc_events.get().iter().get_activity_metrics();
18        self.normalize(&mut scores);
19
20        scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
21
22        scores.truncate(10);
23
24        let mut result = Vec::new();
25        let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
26        for score in scores {
27            if tree.maybe_find(&score.id).is_none() {
28                continue;
29            }
30
31            if tree.calculate_deleted(&score.id)? {
32                continue;
33            }
34            if tree.in_pending_share(&score.id)? {
35                continue;
36            }
37            result.push(score.id);
38        }
39
40        Ok(result)
41    }
42
43    pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
44        let mut tx = self.begin_tx().await;
45        let db = tx.db();
46
47        let max_stored_events = 1000;
48        let events = &db.doc_events;
49
50        if events.get().len() > max_stored_events {
51            db.doc_events.remove(0)?;
52        }
53        db.doc_events.push(event)?;
54        Ok(())
55    }
56
57    pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
58        let read_count_range = StatisticValueRange {
59            max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
60            min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
61        };
62
63        let write_count_range = StatisticValueRange {
64            max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
65            min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
66        };
67
68        let last_read_range = StatisticValueRange {
69            max: docs
70                .iter()
71                .map(|f| f.last_read_timestamp)
72                .max()
73                .unwrap_or_default(),
74            min: docs
75                .iter()
76                .map(|f| f.last_read_timestamp)
77                .min()
78                .unwrap_or_default(),
79        };
80        let last_write_range = StatisticValueRange {
81            max: docs
82                .iter()
83                .map(|f| f.last_write_timestamp)
84                .max()
85                .unwrap_or_default(),
86            min: docs
87                .iter()
88                .map(|f| f.last_write_timestamp)
89                .min()
90                .unwrap_or_default(),
91        };
92
93        docs.iter_mut().for_each(|f| {
94            f.read_count.normalize(read_count_range);
95            f.write_count.normalize(write_count_range);
96            f.last_read_timestamp.normalize(last_read_range);
97            f.last_write_timestamp.normalize(last_write_range);
98        });
99    }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash)]
103pub enum DocEvent {
104    Read(Uuid, i64),
105    Write(Uuid, i64),
106}
107impl DocEvent {
108    pub fn timestamp(&self) -> i64 {
109        match *self {
110            DocEvent::Read(_, timestamp) => timestamp,
111            DocEvent::Write(_, timestamp) => timestamp,
112        }
113    }
114    pub fn id(&self) -> Uuid {
115        match *self {
116            DocEvent::Read(id, _) => id,
117            DocEvent::Write(id, _) => id,
118        }
119    }
120}
121
122#[derive(Debug, Copy, Clone)]
123pub struct RankingWeights {
124    /// the freshness of a doc as determined by the last activity
125    pub temporality: i64,
126    /// the amount of write and read on a doc
127    pub io: i64,
128}
129
130impl Default for RankingWeights {
131    fn default() -> Self {
132        Self { temporality: 60, io: 40 }
133    }
134}
135#[derive(Default, Copy, Clone, PartialEq)]
136pub struct StatisticValue {
137    pub raw: i64,
138    pub normalized: Option<f64>,
139}
140
141impl Ord for StatisticValue {
142    fn cmp(&self, other: &Self) -> Ordering {
143        (self.raw).cmp(&other.raw)
144    }
145}
146
147impl PartialOrd for StatisticValue {
148    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
149        Some(self.cmp(other))
150    }
151}
152
153impl Eq for StatisticValue {}
154
155#[derive(Clone, Copy)]
156pub struct StatisticValueRange {
157    pub max: StatisticValue,
158    pub min: StatisticValue,
159}
160impl StatisticValue {
161    pub fn normalize(&mut self, range: StatisticValueRange) {
162        let mut range_distance = range.max.raw - range.min.raw;
163        if range_distance == 0 {
164            range_distance = 1
165        };
166        let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
167        self.normalized = Some(normalized);
168    }
169}
170/// DocActivityMetrics stores key document activity features, which are used to recommend relevant documents to the user.
171/// Here's a walkthrough of the recommendation procedure: collect 1k most recent document events (write/read), use that activity to construct a DocActivtyMetrics struct for each document. Min-max normalizes the activity features, then rank the documents.
172#[derive(Default, Copy, Clone, PartialEq)]
173pub struct DocActivityMetrics {
174    pub id: Uuid,
175    /// the latest epoch timestamp that the user read a document
176    pub last_read_timestamp: StatisticValue,
177    /// the latest epoch timestamp that the user wrote a document
178    pub last_write_timestamp: StatisticValue,
179    /// the total number of times that a user reads a document
180    pub read_count: StatisticValue,
181    /// the total number of times that a user wrote a document
182    pub write_count: StatisticValue,
183}
184
185impl DocActivityMetrics {
186    pub fn score(&self, weights: RankingWeights) -> i64 {
187        let timestamp_weight = weights.temporality;
188        let io_count_weight = weights.io;
189
190        let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
191            + self.last_write_timestamp.normalized.unwrap_or_default())
192            * timestamp_weight as f64;
193
194        let io_score = (self.read_count.normalized.unwrap_or_default()
195            + self.write_count.normalized.unwrap_or_default())
196            * io_count_weight as f64;
197
198        (io_score + temporality_score).ceil() as i64
199    }
200}
201pub trait Stats {
202    fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
203}
204impl<'a, T> Stats for T
205where
206    T: Iterator<Item = &'a DocEvent>,
207{
208    fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
209        let mut result = Vec::new();
210
211        let mut set = HashMap::new();
212        for event in self {
213            match set.get_mut(&event.id()) {
214                None => {
215                    set.insert(event.id(), vec![event]);
216                }
217                Some(events) => {
218                    events.push(event);
219                }
220            }
221        }
222
223        for (id, events) in set {
224            let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
225
226            let last_read = read_events
227                .clone()
228                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
229
230            let last_read = match last_read {
231                None => 0,
232                Some(x) => x.timestamp(),
233            };
234
235            let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
236
237            let last_write = write_events
238                .clone()
239                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
240            let last_write = match last_write {
241                None => 0,
242                Some(x) => x.timestamp(),
243            };
244
245            let metrics = DocActivityMetrics {
246                id,
247                last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
248                last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
249                read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
250                write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
251            };
252            result.push(metrics);
253        }
254
255        result
256    }
257}