semilattice_database_session/
lib.rs

1mod commit;
2mod session;
3mod update;
4
5pub use semilattice_database::{
6    search, Activity, Collection, CollectionRow, Condition, CustomOrderKey, CustomSort, DataOption,
7    Depend, FieldName, Order, OrderKey, SearchResult, Term, Uuid,
8};
9pub use session::{
10    Depends, Pend, Session, SessionCustomOrder, SessionOrder, SessionOrderKey, SessionRecord,
11    SessionSearchResult,
12};
13
14use std::{
15    io::Read,
16    num::{NonZeroI32, NonZeroI64, NonZeroU32},
17    path::PathBuf,
18    sync::Arc,
19    time::{self, UNIX_EPOCH},
20};
21
22use hashbrown::HashMap;
23use semilattice_database::{idx_binary::AvltrieeUpdate, Database, Field, FileMmap, IdxFile};
24use session::SessionInfo;
25
26pub struct SessionDatabase {
27    database: Database,
28    sessions_dir: PathBuf,
29}
30
31impl std::ops::Deref for SessionDatabase {
32    type Target = Database;
33    fn deref(&self) -> &Self::Target {
34        &self.database
35    }
36}
37impl std::ops::DerefMut for SessionDatabase {
38    fn deref_mut(&mut self) -> &mut Self::Target {
39        &mut self.database
40    }
41}
42
43impl SessionDatabase {
44    pub fn new(
45        dir: PathBuf,
46        collection_settings: Option<std::collections::HashMap<String, DataOption>>,
47        relation_reserve_unit: u32,
48    ) -> Self {
49        let database = Database::new(dir.clone(), collection_settings, relation_reserve_unit);
50        let mut sessions_dir = dir.to_path_buf();
51        sessions_dir.push("sessions");
52        Self {
53            database,
54            sessions_dir,
55        }
56    }
57    pub fn sessions(&self) -> Vec<SessionInfo> {
58        let mut sessions = Vec::new();
59        if self.sessions_dir.exists() {
60            let dir = self.sessions_dir.read_dir().unwrap();
61            for d in dir.into_iter() {
62                let d = d.unwrap();
63                if d.file_type().unwrap().is_dir() {
64                    if let Some(fname) = d.file_name().to_str() {
65                        let mut access_at = 0;
66                        let mut expire = 0;
67
68                        let mut expire_file = d.path().to_path_buf();
69                        expire_file.push("expire");
70                        if expire_file.exists() {
71                            if let Ok(md) = expire_file.metadata() {
72                                if let Ok(m) = md.modified() {
73                                    access_at = m.duration_since(UNIX_EPOCH).unwrap().as_secs();
74                                    let mut file = std::fs::File::open(expire_file).unwrap();
75                                    let mut buf = [0u8; 8];
76                                    file.read(&mut buf).unwrap();
77                                    expire = i64::from_be_bytes(buf);
78                                }
79                            }
80                        }
81                        sessions.push(SessionInfo {
82                            name: fname.to_owned(),
83                            access_at,
84                            expire,
85                        });
86                    }
87                }
88            }
89        }
90        sessions
91    }
92    pub fn session_gc(&self, default_expire_interval_sec: i64) {
93        for session in self.sessions().into_iter() {
94            let expire = if session.expire < 0 {
95                default_expire_interval_sec
96            } else {
97                session.expire
98            };
99            if session.access_at
100                < (time::SystemTime::now() - time::Duration::new(expire as u64, 0))
101                    .duration_since(UNIX_EPOCH)
102                    .unwrap()
103                    .as_secs()
104            {
105                let mut path = self.sessions_dir.clone();
106                path.push(session.name);
107                std::fs::remove_dir_all(&path).unwrap();
108            }
109        }
110    }
111
112    pub fn session(&self, session_name: &str, expire_interval_sec: Option<i64>) -> Session {
113        let session_dir = self.session_dir(session_name);
114        if !session_dir.exists() {
115            std::fs::create_dir_all(&session_dir).unwrap();
116        }
117        Session::new(self, session_name, expire_interval_sec)
118    }
119    pub fn session_dir(&self, session_name: &str) -> PathBuf {
120        let mut dir = self.sessions_dir.clone();
121        dir.push(session_name);
122        dir
123    }
124    fn delete_dir(dir: PathBuf) {
125        for d in dir.read_dir().unwrap().into_iter() {
126            let d = d.unwrap();
127            if d.file_type().unwrap().is_dir() {
128                let dir = d.path();
129                Self::delete_dir(dir);
130            } else {
131                let file = d.path();
132                std::fs::remove_file(file).unwrap();
133            }
134        }
135        let _ = std::fs::remove_dir_all(dir);
136    }
137    pub fn session_clear(&self, session: &mut Session) {
138        let session_dir = self.session_dir(session.name());
139        session.session_data = None;
140        if session_dir.exists() {
141            Self::delete_dir(session_dir);
142        }
143        session.temporary_data.clear();
144    }
145
146    pub fn session_restart(&self, session: &mut Session, expire_interval_sec: Option<i64>) {
147        self.session_clear(session);
148        self.init_session(session, expire_interval_sec)
149    }
150
151    fn init_session(&self, session: &mut Session, expire_interval_sec: Option<i64>) {
152        let session_dir = self.session_dir(session.name());
153        std::fs::create_dir_all(&session_dir).unwrap();
154        let session_data = Session::new_data(&session_dir, expire_interval_sec);
155        let temporary_data = session_data.init_temporary_data();
156        session.session_data = Some(session_data);
157        session.temporary_data = temporary_data;
158    }
159
160    pub async fn update(
161        &self,
162        session: &mut Session,
163        records: Vec<SessionRecord>,
164    ) -> Vec<CollectionRow> {
165        let mut ret = vec![];
166        let session_dir = self.session_dir(session.name());
167        if let None = session.session_data {
168            self.init_session(session, None);
169        }
170        if let Some(ref mut session_data) = session.session_data {
171            let current = session_data.sequence_number.current();
172            let max = session_data.sequence_number.max();
173            if current < max {
174                for row in ((current + 1)..=max).rev() {
175                    for session_row in session_data
176                        .sequence
177                        .iter_by(&row)
178                        .collect::<Vec<_>>()
179                        .into_iter()
180                    {
181                        futures::join!(
182                            session_data.relation.delete(session_row),
183                            async {
184                                session_data.collection_id.delete(session_row);
185                            },
186                            async {
187                                session_data.row.delete(session_row);
188                            },
189                            async {
190                                session_data.operation.delete(session_row);
191                            },
192                            async {
193                                session_data.activity.delete(session_row);
194                            },
195                            async {
196                                session_data.term_begin.delete(session_row);
197                            },
198                            async {
199                                session_data.term_end.delete(session_row);
200                            },
201                            async {
202                                session_data.uuid.delete(session_row);
203                            },
204                            {
205                                let mut fs = vec![];
206                                for (_field_name, field_data) in session_data.fields.iter_mut() {
207                                    fs.push(async { field_data.delete(session_row) });
208                                }
209                                futures::future::join_all(fs)
210                            },
211                            async {
212                                session_data.sequence.delete(session_row);
213                            }
214                        );
215                    }
216                }
217            }
218
219            let sequence = session_data.sequence_number.next();
220            ret.extend(
221                self.update_recursive(
222                    session_data,
223                    &mut session.temporary_data,
224                    &session_dir,
225                    &sequence,
226                    &records,
227                    None,
228                )
229                .await,
230            );
231        }
232        ret
233    }
234
235    pub fn depends_with_session(
236        &self,
237        key: Option<Arc<String>>,
238        pend_collection_id: NonZeroI32,
239        pend_row: NonZeroI64,
240        session: Option<&Session>,
241    ) -> Vec<Depend> {
242        let pend_row = pend_row.get();
243        if pend_row < 0 {
244            if let Some(session) = session {
245                if let Some(session_depends) = session.depends(key, unsafe {
246                    NonZeroU32::new_unchecked((-pend_row) as u32)
247                }) {
248                    return session_depends;
249                }
250            }
251            vec![]
252        } else if pend_collection_id.get() > 0 {
253            self.relation()
254                .depends(
255                    key,
256                    &CollectionRow::new(pend_collection_id, unsafe {
257                        NonZeroU32::new_unchecked(pend_row as u32)
258                    }),
259                )
260                .into_iter()
261                .collect()
262        } else {
263            self.relation()
264                .depends(
265                    key,
266                    &CollectionRow::new(-pend_collection_id, unsafe {
267                        NonZeroU32::new_unchecked(pend_row as u32)
268                    }),
269                )
270                .into_iter()
271                .collect()
272        }
273    }
274
275    pub async fn register_relations_with_session(
276        &mut self,
277        depend: &CollectionRow,
278        pends: Vec<(Arc<String>, CollectionRow)>,
279        row_map: &HashMap<CollectionRow, CollectionRow>,
280    ) {
281        for (key_name, pend) in pends.iter() {
282            if pend.collection_id().get() < 0 {
283                if let Some(pend) = row_map.get(pend) {
284                    self.register_relation(&key_name, depend, pend).await;
285                }
286            } else {
287                self.register_relation(&key_name, depend, pend).await;
288            }
289        }
290    }
291}