semilattice_database_session/
lib.rs1mod commit;
2mod session;
3mod update;
4
5pub use semilattice_database::{
6 search, Activity, Collection, CollectionRow, Condition, CustomOrderKey, CustomSort, DataOption,
7 Depend, FieldName, Order, OrderKey, SearchResult, Term, Uuid,
8};
9pub use session::{
10 Depends, Pend, Session, SessionCustomOrder, SessionOrder, SessionOrderKey, SessionRecord,
11 SessionSearchResult,
12};
13
14use std::{
15 io::Read,
16 num::{NonZeroI32, NonZeroI64, NonZeroU32},
17 path::PathBuf,
18 sync::Arc,
19 time::{self, UNIX_EPOCH},
20};
21
22use hashbrown::HashMap;
23use semilattice_database::{idx_binary::AvltrieeUpdate, Database, Field, FileMmap, IdxFile};
24use session::SessionInfo;
25
26pub struct SessionDatabase {
27 database: Database,
28 sessions_dir: PathBuf,
29}
30
31impl std::ops::Deref for SessionDatabase {
32 type Target = Database;
33 fn deref(&self) -> &Self::Target {
34 &self.database
35 }
36}
37impl std::ops::DerefMut for SessionDatabase {
38 fn deref_mut(&mut self) -> &mut Self::Target {
39 &mut self.database
40 }
41}
42
43impl SessionDatabase {
44 pub fn new(
45 dir: PathBuf,
46 collection_settings: Option<std::collections::HashMap<String, DataOption>>,
47 relation_reserve_unit: u32,
48 ) -> Self {
49 let database = Database::new(dir.clone(), collection_settings, relation_reserve_unit);
50 let mut sessions_dir = dir.to_path_buf();
51 sessions_dir.push("sessions");
52 Self {
53 database,
54 sessions_dir,
55 }
56 }
57 pub fn sessions(&self) -> Vec<SessionInfo> {
58 let mut sessions = Vec::new();
59 if self.sessions_dir.exists() {
60 let dir = self.sessions_dir.read_dir().unwrap();
61 for d in dir.into_iter() {
62 let d = d.unwrap();
63 if d.file_type().unwrap().is_dir() {
64 if let Some(fname) = d.file_name().to_str() {
65 let mut access_at = 0;
66 let mut expire = 0;
67
68 let mut expire_file = d.path().to_path_buf();
69 expire_file.push("expire");
70 if expire_file.exists() {
71 if let Ok(md) = expire_file.metadata() {
72 if let Ok(m) = md.modified() {
73 access_at = m.duration_since(UNIX_EPOCH).unwrap().as_secs();
74 let mut file = std::fs::File::open(expire_file).unwrap();
75 let mut buf = [0u8; 8];
76 file.read(&mut buf).unwrap();
77 expire = i64::from_be_bytes(buf);
78 }
79 }
80 }
81 sessions.push(SessionInfo {
82 name: fname.to_owned(),
83 access_at,
84 expire,
85 });
86 }
87 }
88 }
89 }
90 sessions
91 }
92 pub fn session_gc(&self, default_expire_interval_sec: i64) {
93 for session in self.sessions().into_iter() {
94 let expire = if session.expire < 0 {
95 default_expire_interval_sec
96 } else {
97 session.expire
98 };
99 if session.access_at
100 < (time::SystemTime::now() - time::Duration::new(expire as u64, 0))
101 .duration_since(UNIX_EPOCH)
102 .unwrap()
103 .as_secs()
104 {
105 let mut path = self.sessions_dir.clone();
106 path.push(session.name);
107 std::fs::remove_dir_all(&path).unwrap();
108 }
109 }
110 }
111
112 pub fn session(&self, session_name: &str, expire_interval_sec: Option<i64>) -> Session {
113 let session_dir = self.session_dir(session_name);
114 if !session_dir.exists() {
115 std::fs::create_dir_all(&session_dir).unwrap();
116 }
117 Session::new(self, session_name, expire_interval_sec)
118 }
119 pub fn session_dir(&self, session_name: &str) -> PathBuf {
120 let mut dir = self.sessions_dir.clone();
121 dir.push(session_name);
122 dir
123 }
124 fn delete_dir(dir: PathBuf) {
125 for d in dir.read_dir().unwrap().into_iter() {
126 let d = d.unwrap();
127 if d.file_type().unwrap().is_dir() {
128 let dir = d.path();
129 Self::delete_dir(dir);
130 } else {
131 let file = d.path();
132 std::fs::remove_file(file).unwrap();
133 }
134 }
135 let _ = std::fs::remove_dir_all(dir);
136 }
137 pub fn session_clear(&self, session: &mut Session) {
138 let session_dir = self.session_dir(session.name());
139 session.session_data = None;
140 if session_dir.exists() {
141 Self::delete_dir(session_dir);
142 }
143 session.temporary_data.clear();
144 }
145
146 pub fn session_restart(&self, session: &mut Session, expire_interval_sec: Option<i64>) {
147 self.session_clear(session);
148 self.init_session(session, expire_interval_sec)
149 }
150
151 fn init_session(&self, session: &mut Session, expire_interval_sec: Option<i64>) {
152 let session_dir = self.session_dir(session.name());
153 std::fs::create_dir_all(&session_dir).unwrap();
154 let session_data = Session::new_data(&session_dir, expire_interval_sec);
155 let temporary_data = session_data.init_temporary_data();
156 session.session_data = Some(session_data);
157 session.temporary_data = temporary_data;
158 }
159
160 pub async fn update(
161 &self,
162 session: &mut Session,
163 records: Vec<SessionRecord>,
164 ) -> Vec<CollectionRow> {
165 let mut ret = vec![];
166 let session_dir = self.session_dir(session.name());
167 if let None = session.session_data {
168 self.init_session(session, None);
169 }
170 if let Some(ref mut session_data) = session.session_data {
171 let current = session_data.sequence_number.current();
172 let max = session_data.sequence_number.max();
173 if current < max {
174 for row in ((current + 1)..=max).rev() {
175 for session_row in session_data
176 .sequence
177 .iter_by(&row)
178 .collect::<Vec<_>>()
179 .into_iter()
180 {
181 futures::join!(
182 session_data.relation.delete(session_row),
183 async {
184 session_data.collection_id.delete(session_row);
185 },
186 async {
187 session_data.row.delete(session_row);
188 },
189 async {
190 session_data.operation.delete(session_row);
191 },
192 async {
193 session_data.activity.delete(session_row);
194 },
195 async {
196 session_data.term_begin.delete(session_row);
197 },
198 async {
199 session_data.term_end.delete(session_row);
200 },
201 async {
202 session_data.uuid.delete(session_row);
203 },
204 {
205 let mut fs = vec![];
206 for (_field_name, field_data) in session_data.fields.iter_mut() {
207 fs.push(async { field_data.delete(session_row) });
208 }
209 futures::future::join_all(fs)
210 },
211 async {
212 session_data.sequence.delete(session_row);
213 }
214 );
215 }
216 }
217 }
218
219 let sequence = session_data.sequence_number.next();
220 ret.extend(
221 self.update_recursive(
222 session_data,
223 &mut session.temporary_data,
224 &session_dir,
225 &sequence,
226 &records,
227 None,
228 )
229 .await,
230 );
231 }
232 ret
233 }
234
235 pub fn depends_with_session(
236 &self,
237 key: Option<Arc<String>>,
238 pend_collection_id: NonZeroI32,
239 pend_row: NonZeroI64,
240 session: Option<&Session>,
241 ) -> Vec<Depend> {
242 let pend_row = pend_row.get();
243 if pend_row < 0 {
244 if let Some(session) = session {
245 if let Some(session_depends) = session.depends(key, unsafe {
246 NonZeroU32::new_unchecked((-pend_row) as u32)
247 }) {
248 return session_depends;
249 }
250 }
251 vec![]
252 } else if pend_collection_id.get() > 0 {
253 self.relation()
254 .depends(
255 key,
256 &CollectionRow::new(pend_collection_id, unsafe {
257 NonZeroU32::new_unchecked(pend_row as u32)
258 }),
259 )
260 .into_iter()
261 .collect()
262 } else {
263 self.relation()
264 .depends(
265 key,
266 &CollectionRow::new(-pend_collection_id, unsafe {
267 NonZeroU32::new_unchecked(pend_row as u32)
268 }),
269 )
270 .into_iter()
271 .collect()
272 }
273 }
274
275 pub async fn register_relations_with_session(
276 &mut self,
277 depend: &CollectionRow,
278 pends: Vec<(Arc<String>, CollectionRow)>,
279 row_map: &HashMap<CollectionRow, CollectionRow>,
280 ) {
281 for (key_name, pend) in pends.iter() {
282 if pend.collection_id().get() < 0 {
283 if let Some(pend) = row_map.get(pend) {
284 self.register_relation(&key_name, depend, pend).await;
285 }
286 } else {
287 self.register_relation(&key_name, depend, pend).await;
288 }
289 }
290 }
291}