firedbg_stream_indexer/
database.rs1use std::collections::{HashMap, HashSet};
2
3use sea_orm::{
4 sea_query::{Expr, Query},
5 ActiveModelTrait, ConnectOptions, ConnectionTrait, Database as SeaDatabase, DbConn, DbErr,
6 EntityTrait, Schema,
7};
8use sea_streamer::file::AsyncFile;
9
10use crate::entity::{
11 breakpoint::{self, Entity as Breakpoint},
12 debugger_info::{self, Entity as DebuggerInfo},
13 event::{self, Entity as Event},
14 file::{self, Entity as File},
15 function::{self, Entity as Function},
16 type_info::{self, Entity as TypeInfo},
17};
18
19#[derive(Debug)]
20pub struct Database {
21 path: String,
22 db: Option<DbConn>,
23}
24
25impl Database {
26 pub async fn create(path: String) -> Result<Self, DbErr> {
27 AsyncFile::new_ow(path.parse().expect("UrlErr")) .await
29 .expect("File System Error");
30 let mut opt = ConnectOptions::new(format!("sqlite://{path}?mode=rw"));
31 opt.max_connections(1).sqlx_logging(false);
32 let db = SeaDatabase::connect(opt).await?;
33 create_tables(&db).await?;
34 Ok(Self { path, db: Some(db) })
35 }
36
37 pub async fn reopen(&mut self) -> Result<(), DbErr> {
38 if let Some(db) = self.db.take() {
40 db.close().await?; }
42 let mut opt = ConnectOptions::new(format!("sqlite://{}", self.path));
43 opt.max_connections(1).sqlx_logging(false);
44 let db = SeaDatabase::connect(opt).await?;
45 self.db = Some(db);
46 Ok(())
47 }
48
49 pub fn db(&self) -> &DbConn {
50 self.db.as_ref().expect("DB closed")
51 }
52
53 pub async fn close(&mut self) -> Result<(), DbErr> {
54 if let Some(db) = self.db.take() {
56 db.close().await?; }
58 Ok(())
59 }
60}
61
62pub async fn create_tables(db: &DbConn) -> Result<(), DbErr> {
63 let builder = db.get_database_backend();
64 let schema = Schema::new(builder);
65
66 let stmt = builder.build(&schema.create_table_from_entity(DebuggerInfo));
67 log::debug!("{stmt}");
68 db.execute(stmt).await?;
69
70 let stmt = builder.build(&schema.create_table_from_entity(File));
71 log::debug!("{stmt}");
72 db.execute(stmt).await?;
73
74 let stmt = builder.build(&schema.create_table_from_entity(Breakpoint));
75 log::debug!("{stmt}");
76 db.execute(stmt).await?;
77
78 let stmt = builder.build(&schema.create_table_from_entity(Event));
79 log::debug!("{stmt}");
80 db.execute(stmt).await?;
81 for stmt in schema.create_index_from_entity(Event) {
82 let stmt = builder.build(&stmt);
83 log::debug!("{stmt}");
84 db.execute(stmt).await?;
85 }
86
87 let stmt = builder.build(&schema.create_table_from_entity(Function));
88 log::debug!("{stmt}");
89 db.execute(stmt).await?;
90
91 let stmt = builder.build(&schema.create_table_from_entity(TypeInfo));
92 log::debug!("{stmt}");
93 db.execute(stmt).await?;
94
95 Ok(())
96}
97
98pub async fn save_debugger_info(
99 db: &Database,
100 info: debugger_info::ActiveModel,
101) -> Result<(), DbErr> {
102 let res = info.save(db.db()).await?;
103 log::debug!("DebuggerInfo::save: {:?}", res);
104 Ok(())
105}
106
107pub async fn insert_files(
108 db: &Database,
109 files: impl Iterator<Item = file::ActiveModel>,
110) -> Result<(), DbErr> {
111 let res = File::insert_many(files)
112 .on_empty_do_nothing()
113 .exec(db.db())
114 .await?;
115 log::debug!("File::insert_many: {:?}", res);
116 Ok(())
117}
118
119pub async fn insert_breakpoints(
120 db: &Database,
121 breakpoints: impl Iterator<Item = breakpoint::ActiveModel>,
122) -> Result<(), DbErr> {
123 let res = Breakpoint::insert_many(breakpoints)
124 .on_empty_do_nothing()
125 .exec(db.db())
126 .await?;
127 log::debug!("Breakpoint::insert_many: {:?}", res);
128 Ok(())
129}
130
131pub async fn insert_events(
132 db: &Database,
133 events: impl Iterator<Item = event::ActiveModel>,
134) -> Result<(), DbErr> {
135 let db = db.db();
136 let mut hits: HashMap<u32, usize> = Default::default();
137 let mut functions: HashSet<String> = Default::default();
138 let events: Vec<_> = events.collect();
139 for event in events.iter() {
140 if let Some(function_name) = event.function_name.as_ref() {
141 functions.insert(function_name.clone());
142 }
143 let bp_id = event.breakpoint_id.as_ref();
144 let hit = hits.entry(*bp_id).or_default();
145 *hit += 1;
146 }
147
148 let start = std::time::Instant::now();
149 let res = Event::insert_many(events)
150 .on_empty_do_nothing()
151 .exec(db)
152 .await?;
153 let duration = start.elapsed();
154 log::debug!("Event::insert_many: {:?} in {:?}", res, duration);
155
156 inc_hit_count(db, hits).await?;
157 insert_functions(db, functions).await?;
158
159 Ok(())
160}
161
162async fn inc_hit_count(db: &DbConn, breakpoints: HashMap<u32, usize>) -> Result<(), DbErr> {
163 let mut groups: HashMap<usize, Vec<u32>> = Default::default();
165 for (bp_id, count) in breakpoints {
166 let entry = groups.entry(count).or_default();
167 entry.push(bp_id);
168 }
169
170 use breakpoint::Column::HitCount;
171
172 for (count, bp_ids) in groups {
173 let mut update = Query::update();
174 update
175 .table(Breakpoint)
176 .value(HitCount, Expr::col(HitCount).add(count as u32))
177 .and_where(Expr::col(breakpoint::Column::Id).is_in(bp_ids));
178
179 let stmt = db.get_database_backend().build(&update);
180 log::debug!("{stmt}");
181 db.execute(stmt).await?;
182 }
183
184 Ok(())
185}
186
187async fn insert_functions(db: &DbConn, functions: HashSet<String>) -> Result<(), DbErr> {
188 use sea_orm::{sea_query::OnConflict, Set};
189
190 Function::insert_many(functions.into_iter().map(|n| function::ActiveModel {
191 function_name: Set(n),
192 }))
193 .on_conflict(OnConflict::new().do_nothing().to_owned())
194 .do_nothing()
195 .exec(db)
196 .await?;
197
198 Ok(())
199}
200
201pub async fn insert_type_info(
202 db: &Database,
203 type_info: impl Iterator<Item = type_info::Model>,
204) -> Result<(), DbErr> {
205 use sea_orm::{sea_query::OnConflict, IntoActiveModel};
206
207 TypeInfo::insert_many(type_info.map(|m| m.into_active_model()))
208 .on_conflict(OnConflict::new().do_nothing().to_owned())
209 .do_nothing()
210 .exec(db.db())
211 .await?;
212
213 Ok(())
214}