semilattice_database_session/commit.rs
1use std::{
2 num::{NonZeroI32, NonZeroU32},
3 sync::Arc,
4};
5
6use semilattice_database::{idx_binary::AvltrieeSearch, Activity, Term};
7
8use hashbrown::HashMap;
9
10use crate::{
11 session::{SessionData, SessionOperation},
12 CollectionRow, Session, SessionDatabase,
13};
14
15impl SessionDatabase {
16 pub async fn commit(&mut self, session: &mut Session) -> Vec<CollectionRow> {
17 if let Some(ref mut data) = session.session_data {
18 let r = self.commit_inner(data).await;
19 self.session_clear(session);
20 r
21 } else {
22 vec![]
23 }
24 }
25
26 async fn commit_inner(&mut self, session_data: &SessionData) -> Vec<CollectionRow> {
27 let mut commit_rows = Vec::new();
28
29 let mut session_collection_row_map: HashMap<CollectionRow, CollectionRow> = HashMap::new();
30 let mut relation_temporary: HashMap<CollectionRow, Vec<(Arc<String>, CollectionRow)>> =
31 HashMap::new();
32
33 for sequence in 1..=session_data.sequence_number.current() {
34 for session_row in session_data
35 .sequence
36 .iter_by(&sequence)
37 .collect::<Vec<_>>()
38 .into_iter()
39 .rev()
40 {
41 if let (Some(op), Some(collection_id), Some(row)) = (
42 session_data.operation.value(session_row).cloned(),
43 session_data.collection_id.value(session_row).cloned(),
44 session_data.row.value(session_row).cloned(),
45 ) {
46 let in_session = collection_id < 0;
47
48 let main_collection_id = NonZeroI32::new(if in_session {
49 -collection_id
50 } else {
51 collection_id
52 })
53 .unwrap();
54
55 if let Some(collection) = self.collection_mut(main_collection_id) {
56 let row = if row == 0 {
57 session_row
58 } else {
59 unsafe { NonZeroU32::new_unchecked(row) }
60 };
61
62 let fields = if op == SessionOperation::Delete {
63 HashMap::new()
64 } else {
65 session_data
66 .fields
67 .iter()
68 .filter_map(|(field_name, field_data)| {
69 field_data
70 .value(session_row)
71 .map(|val| (field_name.clone(), val.to_owned()))
72 })
73 .collect()
74 };
75
76 let session_collection_row =
77 CollectionRow::new(NonZeroI32::new(collection_id).unwrap(), row);
78 match op {
79 SessionOperation::New | SessionOperation::Update => {
80 let activity = if *unsafe {
81 session_data.activity.value_unchecked(session_row)
82 } == 1
83 {
84 Activity::Active
85 } else {
86 Activity::Inactive
87 };
88 let term_begin = Term::Overwrite(*unsafe {
89 session_data.term_begin.value_unchecked(session_row)
90 });
91 let term_end = Term::Overwrite(*unsafe {
92 session_data.term_end.value_unchecked(session_row)
93 });
94
95 let collection_row = CollectionRow::new(
96 main_collection_id,
97 if op == SessionOperation::New {
98 collection
99 .insert(activity, term_begin, term_end, fields)
100 .await
101 } else {
102 let row = if in_session {
103 let main_collection_row = session_collection_row_map
104 .get(&session_collection_row)
105 .unwrap();
106 main_collection_row.row()
107 } else {
108 row
109 };
110 collection
111 .update(row, activity, term_begin, term_end, fields)
112 .await;
113 row
114 },
115 );
116 commit_rows.push(collection_row.clone());
117 self.relation_mut()
118 .delete_pends_by_collection_row(&collection_row)
119 .await; //Delete once and re-register later
120
121 for relation_row in session_data
122 .relation
123 .rows
124 .session_row
125 .iter_by(&session_row.get())
126 {
127 if let (Some(key), Some(depend)) = (
128 session_data.relation.rows.key.value(relation_row),
129 session_data.relation.rows.depend.value(relation_row),
130 ) {
131 relation_temporary
132 .entry(depend.clone())
133 .or_insert_with(|| Vec::new())
134 .push((
135 Arc::new(
136 unsafe {
137 std::str::from_utf8_unchecked(
138 session_data
139 .relation
140 .key_names
141 .value(
142 NonZeroU32::new(*key).unwrap(),
143 )
144 .unwrap(),
145 )
146 }
147 .to_owned(),
148 ),
149 session_collection_row.clone(),
150 ));
151 }
152 }
153 session_collection_row_map
154 .insert(session_collection_row, collection_row);
155 }
156 SessionOperation::Delete => {
157 if in_session {
158 if let Some(registered) =
159 session_collection_row_map.get(&session_collection_row)
160 {
161 self.delete(registered).await;
162 }
163 } else {
164 self.delete(&CollectionRow::new(main_collection_id, row))
165 .await;
166 }
167 session_collection_row_map.remove(&session_collection_row);
168 }
169 }
170 }
171 }
172 }
173 }
174 for (depend, pends) in relation_temporary.into_iter() {
175 if depend.collection_id().get() < 0 {
176 if let Some(depend) = session_collection_row_map.get(&depend) {
177 self.register_relations_with_session(
178 depend,
179 pends,
180 &session_collection_row_map,
181 )
182 .await;
183 }
184 } else {
185 self.register_relations_with_session(&depend, pends, &session_collection_row_map)
186 .await;
187 }
188 }
189 commit_rows
190 }
191}