firedbg_stream_indexer/
database.rs

1use std::collections::{HashMap, HashSet};
2
3use sea_orm::{
4    sea_query::{Expr, Query},
5    ActiveModelTrait, ConnectOptions, ConnectionTrait, Database as SeaDatabase, DbConn, DbErr,
6    EntityTrait, Schema,
7};
8use sea_streamer::file::AsyncFile;
9
10use crate::entity::{
11    breakpoint::{self, Entity as Breakpoint},
12    debugger_info::{self, Entity as DebuggerInfo},
13    event::{self, Entity as Event},
14    file::{self, Entity as File},
15    function::{self, Entity as Function},
16    type_info::{self, Entity as TypeInfo},
17};
18
19#[derive(Debug)]
20pub struct Database {
21    path: String,
22    db: Option<DbConn>,
23}
24
25impl Database {
26    pub async fn create(path: String) -> Result<Self, DbErr> {
27        AsyncFile::new_ow(path.parse().expect("UrlErr")) // overwrite the file
28            .await
29            .expect("File System Error");
30        let mut opt = ConnectOptions::new(format!("sqlite://{path}?mode=rw"));
31        opt.max_connections(1).sqlx_logging(false);
32        let db = SeaDatabase::connect(opt).await?;
33        create_tables(&db).await?;
34        Ok(Self { path, db: Some(db) })
35    }
36
37    pub async fn reopen(&mut self) -> Result<(), DbErr> {
38        // Close existing db, if any
39        if let Some(db) = self.db.take() {
40            db.close().await?; // drop it
41        }
42        let mut opt = ConnectOptions::new(format!("sqlite://{}", self.path));
43        opt.max_connections(1).sqlx_logging(false);
44        let db = SeaDatabase::connect(opt).await?;
45        self.db = Some(db);
46        Ok(())
47    }
48
49    pub fn db(&self) -> &DbConn {
50        self.db.as_ref().expect("DB closed")
51    }
52
53    pub async fn close(&mut self) -> Result<(), DbErr> {
54        // Close existing db, if any
55        if let Some(db) = self.db.take() {
56            db.close().await?; // drop it
57        }
58        Ok(())
59    }
60}
61
62pub async fn create_tables(db: &DbConn) -> Result<(), DbErr> {
63    let builder = db.get_database_backend();
64    let schema = Schema::new(builder);
65
66    let stmt = builder.build(&schema.create_table_from_entity(DebuggerInfo));
67    log::debug!("{stmt}");
68    db.execute(stmt).await?;
69
70    let stmt = builder.build(&schema.create_table_from_entity(File));
71    log::debug!("{stmt}");
72    db.execute(stmt).await?;
73
74    let stmt = builder.build(&schema.create_table_from_entity(Breakpoint));
75    log::debug!("{stmt}");
76    db.execute(stmt).await?;
77
78    let stmt = builder.build(&schema.create_table_from_entity(Event));
79    log::debug!("{stmt}");
80    db.execute(stmt).await?;
81    for stmt in schema.create_index_from_entity(Event) {
82        let stmt = builder.build(&stmt);
83        log::debug!("{stmt}");
84        db.execute(stmt).await?;
85    }
86
87    let stmt = builder.build(&schema.create_table_from_entity(Function));
88    log::debug!("{stmt}");
89    db.execute(stmt).await?;
90
91    let stmt = builder.build(&schema.create_table_from_entity(TypeInfo));
92    log::debug!("{stmt}");
93    db.execute(stmt).await?;
94
95    Ok(())
96}
97
98pub async fn save_debugger_info(
99    db: &Database,
100    info: debugger_info::ActiveModel,
101) -> Result<(), DbErr> {
102    let res = info.save(db.db()).await?;
103    log::debug!("DebuggerInfo::save: {:?}", res);
104    Ok(())
105}
106
107pub async fn insert_files(
108    db: &Database,
109    files: impl Iterator<Item = file::ActiveModel>,
110) -> Result<(), DbErr> {
111    let res = File::insert_many(files)
112        .on_empty_do_nothing()
113        .exec(db.db())
114        .await?;
115    log::debug!("File::insert_many: {:?}", res);
116    Ok(())
117}
118
119pub async fn insert_breakpoints(
120    db: &Database,
121    breakpoints: impl Iterator<Item = breakpoint::ActiveModel>,
122) -> Result<(), DbErr> {
123    let res = Breakpoint::insert_many(breakpoints)
124        .on_empty_do_nothing()
125        .exec(db.db())
126        .await?;
127    log::debug!("Breakpoint::insert_many: {:?}", res);
128    Ok(())
129}
130
131pub async fn insert_events(
132    db: &Database,
133    events: impl Iterator<Item = event::ActiveModel>,
134) -> Result<(), DbErr> {
135    let db = db.db();
136    let mut hits: HashMap<u32, usize> = Default::default();
137    let mut functions: HashSet<String> = Default::default();
138    let events: Vec<_> = events.collect();
139    for event in events.iter() {
140        if let Some(function_name) = event.function_name.as_ref() {
141            functions.insert(function_name.clone());
142        }
143        let bp_id = event.breakpoint_id.as_ref();
144        let hit = hits.entry(*bp_id).or_default();
145        *hit += 1;
146    }
147
148    let start = std::time::Instant::now();
149    let res = Event::insert_many(events)
150        .on_empty_do_nothing()
151        .exec(db)
152        .await?;
153    let duration = start.elapsed();
154    log::debug!("Event::insert_many: {:?} in {:?}", res, duration);
155
156    inc_hit_count(db, hits).await?;
157    insert_functions(db, functions).await?;
158
159    Ok(())
160}
161
162async fn inc_hit_count(db: &DbConn, breakpoints: HashMap<u32, usize>) -> Result<(), DbErr> {
163    // we batch the updates for all the breakpoints hit by N
164    let mut groups: HashMap<usize, Vec<u32>> = Default::default();
165    for (bp_id, count) in breakpoints {
166        let entry = groups.entry(count).or_default();
167        entry.push(bp_id);
168    }
169
170    use breakpoint::Column::HitCount;
171
172    for (count, bp_ids) in groups {
173        let mut update = Query::update();
174        update
175            .table(Breakpoint)
176            .value(HitCount, Expr::col(HitCount).add(count as u32))
177            .and_where(Expr::col(breakpoint::Column::Id).is_in(bp_ids));
178
179        let stmt = db.get_database_backend().build(&update);
180        log::debug!("{stmt}");
181        db.execute(stmt).await?;
182    }
183
184    Ok(())
185}
186
187async fn insert_functions(db: &DbConn, functions: HashSet<String>) -> Result<(), DbErr> {
188    use sea_orm::{sea_query::OnConflict, Set};
189
190    Function::insert_many(functions.into_iter().map(|n| function::ActiveModel {
191        function_name: Set(n),
192    }))
193    .on_conflict(OnConflict::new().do_nothing().to_owned())
194    .do_nothing()
195    .exec(db)
196    .await?;
197
198    Ok(())
199}
200
201pub async fn insert_type_info(
202    db: &Database,
203    type_info: impl Iterator<Item = type_info::Model>,
204) -> Result<(), DbErr> {
205    use sea_orm::{sea_query::OnConflict, IntoActiveModel};
206
207    TypeInfo::insert_many(type_info.map(|m| m.into_active_model()))
208        .on_conflict(OnConflict::new().do_nothing().to_owned())
209        .do_nothing()
210        .exec(db.db())
211        .await?;
212
213    Ok(())
214}