1mod types;
15
16use std::collections::HashSet;
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21use anyhow::{anyhow, bail, Context, Result};
22
23use crate::link::Link;
24use crate::named_types::{NamedTypes, NamedTypesDecorator};
25
26pub use types::{CommitMode, DoubletLink, LogRetentionPolicy, Transition, TransitionKind};
27use types::{
28 APPLIED_MARKER_PREFIX, COMMIT_MARKER_PREFIX, ROLLBACK_MARKER_PREFIX, TRANSITION_NAME_PREFIX,
29};
30
31struct PendingTransaction {
34 id: u128,
35 transitions: Vec<Transition>,
36 auto_commit: bool,
37 started_ms: i64,
38}
39
40#[derive(Debug, Clone)]
42pub struct TransactionHandle {
43 pub id: u128,
44 pub started_ms: i64,
45}
46
47pub struct TransactionsDecorator {
50 inner: NamedTypesDecorator,
51 log_store: NamedTypesDecorator,
52 log: Vec<Transition>,
53 committed: HashSet<u128>,
54 rolled_back: HashSet<u128>,
55 applied: HashSet<i64>,
56 current: Option<PendingTransaction>,
57 sequence_counter: i64,
58 applied_sequence: i64,
59 retention_policy: LogRetentionPolicy,
60 commit_mode: CommitMode,
61 replaying: bool,
62 trace: bool,
63}
64
65impl TransactionsDecorator {
66 pub fn new(
69 inner: NamedTypesDecorator,
70 log_store: NamedTypesDecorator,
71 retention_policy: LogRetentionPolicy,
72 commit_mode: CommitMode,
73 trace: bool,
74 ) -> Result<Self> {
75 let mut decorator = Self {
76 inner,
77 log_store,
78 log: Vec::new(),
79 committed: HashSet::new(),
80 rolled_back: HashSet::new(),
81 applied: HashSet::new(),
82 current: None,
83 sequence_counter: 0,
84 applied_sequence: 0,
85 retention_policy,
86 commit_mode,
87 replaying: false,
88 trace,
89 };
90 decorator.recover()?;
91 Ok(decorator)
92 }
93
94 pub fn make_transitions_database_filename<P: AsRef<Path>>(database_filename: P) -> PathBuf {
96 let path = database_filename.as_ref();
97 let stem = path
98 .file_stem()
99 .and_then(|s| s.to_str())
100 .unwrap_or_default();
101 let name = format!("{stem}.transitions.links");
102 match path.parent() {
103 Some(parent) if !parent.as_os_str().is_empty() => parent.join(name),
104 _ => PathBuf::from(name),
105 }
106 }
107
108 pub fn retention_policy(&self) -> &LogRetentionPolicy {
109 &self.retention_policy
110 }
111
112 pub fn set_retention_policy(&mut self, policy: LogRetentionPolicy) {
113 self.retention_policy = policy;
114 }
115
116 pub fn commit_mode(&self) -> CommitMode {
117 self.commit_mode
118 }
119
120 pub fn set_commit_mode(&mut self, mode: CommitMode) {
121 self.commit_mode = mode;
122 }
123
124 pub fn applied_sequence(&self) -> i64 {
125 self.applied_sequence
126 }
127
128 pub fn last_logged_sequence(&self) -> i64 {
129 self.sequence_counter
130 }
131
132 pub fn log(&self) -> Vec<Transition> {
134 self.log.clone()
135 }
136
137 pub fn inner(&self) -> &NamedTypesDecorator {
138 &self.inner
139 }
140
141 pub fn inner_mut(&mut self) -> &mut NamedTypesDecorator {
142 &mut self.inner
143 }
144
145 pub fn log_store(&self) -> &NamedTypesDecorator {
146 &self.log_store
147 }
148
149 pub fn log_store_mut(&mut self) -> &mut NamedTypesDecorator {
150 &mut self.log_store
151 }
152
153 pub fn into_inner(self) -> (NamedTypesDecorator, NamedTypesDecorator) {
154 (self.inner, self.log_store)
155 }
156
157 pub fn save(&self) -> Result<()> {
158 self.inner.save()?;
159 self.log_store.save()?;
160 Ok(())
161 }
162
163 pub fn create(&mut self, source: u32, target: u32) -> Result<u32> {
166 if self.replaying {
167 return Ok(self.inner.create(source, target));
168 }
169 let owns = self.ensure_open_transaction();
170 let id = self.inner.create(source, target);
171 let after = self
172 .inner
173 .get(id)
174 .map(DoubletLink::from_link)
175 .unwrap_or_else(|| DoubletLink::new(id, source, target));
176 self.record_transition(TransitionKind::Create, DoubletLink::empty(), after)?;
177 if owns {
178 self.commit_current()?;
179 }
180 Ok(id)
181 }
182
183 pub fn update(&mut self, id: u32, source: u32, target: u32) -> Result<Link> {
184 if self.replaying {
185 return self.inner.update(id, source, target);
186 }
187 let before = self
188 .inner
189 .get(id)
190 .map(DoubletLink::from_link)
191 .unwrap_or_else(|| DoubletLink::new(id, 0, 0));
192 let owns = self.ensure_open_transaction();
193 let prev = match self.inner.update(id, source, target) {
194 Ok(prev) => prev,
195 Err(err) => {
196 if owns {
197 self.current = None;
198 }
199 return Err(err);
200 }
201 };
202 let after = self
203 .inner
204 .get(id)
205 .map(DoubletLink::from_link)
206 .unwrap_or_else(|| DoubletLink::new(id, source, target));
207 self.record_transition(TransitionKind::Update, before, after)?;
208 if owns {
209 self.commit_current()?;
210 }
211 Ok(prev)
212 }
213
214 pub fn delete(&mut self, id: u32) -> Result<Link> {
215 if self.replaying {
216 return self.inner.delete(id);
217 }
218 let before = self
219 .inner
220 .get(id)
221 .map(DoubletLink::from_link)
222 .unwrap_or_else(|| DoubletLink::new(id, 0, 0));
223 let owns = self.ensure_open_transaction();
224 let deleted = match self.inner.delete(id) {
225 Ok(d) => d,
226 Err(err) => {
227 if owns {
228 self.current = None;
229 }
230 return Err(err);
231 }
232 };
233 self.record_transition(TransitionKind::Delete, before, DoubletLink::empty())?;
234 if owns {
235 self.commit_current()?;
236 }
237 Ok(deleted)
238 }
239
240 pub fn create_and_update(&mut self, source: u32, target: u32) -> Result<u32> {
245 let owns = self.ensure_open_transaction();
246 let id = self.create(0, 0)?;
247 self.update(id, source, target)?;
248 if owns {
249 self.commit_current()?;
250 }
251 Ok(id)
252 }
253
254 pub fn exists(&self, id: u32) -> bool {
255 self.inner.exists(id)
256 }
257
258 pub fn get(&self, id: u32) -> Option<&Link> {
259 self.inner.get(id)
260 }
261
262 pub fn all(&self) -> Vec<&Link> {
263 self.inner.all()
264 }
265
266 pub fn query(
267 &self,
268 index: Option<u32>,
269 source: Option<u32>,
270 target: Option<u32>,
271 ) -> Vec<&Link> {
272 self.inner.query(index, source, target)
273 }
274
275 pub fn search(&self, source: u32, target: u32) -> Option<u32> {
276 self.inner.search(source, target)
277 }
278
279 pub fn get_or_create(&mut self, source: u32, target: u32) -> Result<u32> {
280 if let Some(existing) = self.inner.search(source, target) {
281 return Ok(existing);
282 }
283 self.create(source, target)
284 }
285
286 pub fn ensure_created(&mut self, id: u32) -> u32 {
287 self.inner.ensure_created(id)
290 }
291
292 fn ensure_open_transaction(&mut self) -> bool {
293 if self.current.is_none() {
294 self.current = Some(PendingTransaction {
295 id: new_transaction_id(),
296 transitions: Vec::new(),
297 auto_commit: true,
298 started_ms: now_unix_ms(),
299 });
300 true
301 } else {
302 false
303 }
304 }
305
306 fn record_transition(
307 &mut self,
308 kind: TransitionKind,
309 before: DoubletLink,
310 after: DoubletLink,
311 ) -> Result<()> {
312 self.sequence_counter += 1;
313 let sequence = self.sequence_counter;
314 let timestamp_ms = now_unix_ms();
315 let transaction_id = self.current.as_ref().map(|tx| tx.id).ok_or_else(|| {
316 anyhow!("internal: missing open transaction while recording transition")
317 })?;
318 let transition = Transition {
319 transaction_id,
320 sequence,
321 timestamp_ms,
322 kind,
323 before,
324 after,
325 };
326 if let Some(current) = self.current.as_mut() {
327 current.transitions.push(transition);
328 }
329 self.log.push(transition);
330 self.write_transition_to_log(&transition)?;
331 if self.trace {
332 eprintln!(
333 "[Transactions] Recorded {:?} seq={} tx={:032x}: ({},{},{}) -> ({},{},{}).",
334 kind,
335 sequence,
336 transaction_id,
337 before.index,
338 before.source,
339 before.target,
340 after.index,
341 after.source,
342 after.target,
343 );
344 }
345 Ok(())
346 }
347
348 fn write_transition_to_log(&mut self, transition: &Transition) -> Result<()> {
349 let link = self.log_store.create(0, 0);
352 let name = format!("{TRANSITION_NAME_PREFIX}{}", transition.serialize());
353 self.log_store.set_name(link, &name)?;
354 Ok(())
355 }
356
357 fn write_marker(&mut self, name: &str) -> Result<()> {
358 let link = self.log_store.create(0, 0);
361 self.log_store.set_name(link, name)?;
362 Ok(())
363 }
364
365 pub fn begin_transaction(&mut self) -> Result<TransactionHandle> {
368 if self.current.is_some() {
369 bail!("Nested transactions are not supported.");
370 }
371 let id = new_transaction_id();
372 let started_ms = now_unix_ms();
373 self.current = Some(PendingTransaction {
374 id,
375 transitions: Vec::new(),
376 auto_commit: false,
377 started_ms,
378 });
379 Ok(TransactionHandle { id, started_ms })
380 }
381
382 pub fn commit(&mut self) -> Result<()> {
383 if self.current.is_none() {
384 return Ok(());
385 }
386 self.commit_current()
387 }
388
389 fn commit_current(&mut self) -> Result<()> {
390 let pending = match self.current.take() {
391 Some(p) => p,
392 None => return Ok(()),
393 };
394 self.committed.insert(pending.id);
395 self.write_marker(&format!("{COMMIT_MARKER_PREFIX}{:032x}", pending.id))?;
396 if self.trace {
397 eprintln!(
398 "[Transactions] Committed tx {:032x} (mode={:?}, transitions={}).",
399 pending.id,
400 self.commit_mode,
401 pending.transitions.len()
402 );
403 }
404 for transition in &pending.transitions {
405 self.mark_applied(transition)?;
406 }
407 let _ = pending.auto_commit;
408 let _ = pending.started_ms;
409 self.enforce_retention()?;
410 Ok(())
411 }
412
413 pub fn rollback(&mut self) -> Result<()> {
414 let pending = match self.current.take() {
415 Some(p) => p,
416 None => return Ok(()),
417 };
418 self.rolled_back.insert(pending.id);
419 self.replaying = true;
420 for transition in pending.transitions.iter().rev() {
421 self.try_revert_transition(transition);
422 }
423 self.replaying = false;
424 self.write_marker(&format!("{ROLLBACK_MARKER_PREFIX}{:032x}", pending.id))?;
425 if self.trace {
426 eprintln!(
427 "[Transactions] Rolled back tx {:032x} ({} transitions).",
428 pending.id,
429 pending.transitions.len(),
430 );
431 }
432 self.enforce_retention()?;
433 Ok(())
434 }
435
436 pub fn apply_transition(&mut self, transition: &Transition) {
439 self.replaying = true;
440 self.try_apply_transition(transition, false);
441 self.replaying = false;
442 }
443
444 pub fn revert_transition(&mut self, transition: &Transition) {
447 self.replaying = true;
448 self.try_revert_transition(transition);
449 self.replaying = false;
450 }
451
452 fn try_apply_transition(&mut self, transition: &Transition, record_applied: bool) {
453 let result: Result<()> = match transition.kind {
454 TransitionKind::Create => {
455 if transition.after.index != 0 && !self.inner.exists(transition.after.index) {
456 self.inner.ensure_created(transition.after.index);
457 self.inner
458 .update(
459 transition.after.index,
460 transition.after.source,
461 transition.after.target,
462 )
463 .map(|_| ())
464 } else {
465 Ok(())
466 }
467 }
468 TransitionKind::Update => {
469 if transition.after.index != 0 && self.inner.exists(transition.after.index) {
470 self.inner
471 .update(
472 transition.after.index,
473 transition.after.source,
474 transition.after.target,
475 )
476 .map(|_| ())
477 } else {
478 Ok(())
479 }
480 }
481 TransitionKind::Delete => {
482 if transition.before.index != 0 && self.inner.exists(transition.before.index) {
483 self.inner.delete(transition.before.index).map(|_| ())
484 } else {
485 Ok(())
486 }
487 }
488 };
489 if let Err(e) = result {
490 if self.trace {
491 eprintln!(
492 "[Transactions] Failed to apply transition seq={}: {e}",
493 transition.sequence
494 );
495 }
496 }
497 if record_applied {
498 let _ = self.mark_applied(transition);
499 }
500 }
501
502 fn try_revert_transition(&mut self, transition: &Transition) {
503 let result = match transition.kind {
504 TransitionKind::Create => {
505 if transition.after.index != 0 && self.inner.exists(transition.after.index) {
506 self.inner.delete(transition.after.index).map(|_| ())
507 } else {
508 Ok(())
509 }
510 }
511 TransitionKind::Update => {
512 if transition.before.index != 0 && self.inner.exists(transition.before.index) {
513 self.inner
514 .update(
515 transition.before.index,
516 transition.before.source,
517 transition.before.target,
518 )
519 .map(|_| ())
520 } else {
521 Ok(())
522 }
523 }
524 TransitionKind::Delete => {
525 if transition.before.index != 0 && !self.inner.exists(transition.before.index) {
526 self.inner.ensure_created(transition.before.index);
527 self.inner
528 .update(
529 transition.before.index,
530 transition.before.source,
531 transition.before.target,
532 )
533 .map(|_| ())
534 } else {
535 Ok(())
536 }
537 }
538 };
539 if let Err(e) = result {
540 if self.trace {
541 eprintln!(
542 "[Transactions] Failed to revert transition seq={}: {e}",
543 transition.sequence
544 );
545 }
546 }
547 }
548
549 fn mark_applied(&mut self, transition: &Transition) -> Result<()> {
550 if self.applied.insert(transition.sequence) {
551 self.write_marker(&format!("{APPLIED_MARKER_PREFIX}{}", transition.sequence))?;
552 if transition.sequence > self.applied_sequence {
553 self.applied_sequence = transition.sequence;
554 }
555 }
556 Ok(())
557 }
558
559 pub fn recover(&mut self) -> Result<()> {
564 self.log.clear();
565 self.committed.clear();
566 self.rolled_back.clear();
567 self.applied.clear();
568 self.sequence_counter = 0;
569 self.applied_sequence = 0;
570
571 let all_links: Vec<Link> = self.log_store.all().into_iter().copied().collect();
573 for link in &all_links {
574 let name = match self.log_store.get_name(link.index)? {
575 Some(value) => value,
576 None => continue,
577 };
578 if let Some(payload) = name.strip_prefix(TRANSITION_NAME_PREFIX) {
579 if let Some(transition) = Transition::try_parse(payload) {
580 insert_ordered(&mut self.log, transition);
581 if transition.sequence > self.sequence_counter {
582 self.sequence_counter = transition.sequence;
583 }
584 }
585 } else if let Some(rest) = name.strip_prefix(COMMIT_MARKER_PREFIX) {
586 if let Ok(tx_id) = u128::from_str_radix(rest, 16) {
587 self.committed.insert(tx_id);
588 }
589 } else if let Some(rest) = name.strip_prefix(ROLLBACK_MARKER_PREFIX) {
590 if let Ok(tx_id) = u128::from_str_radix(rest, 16) {
591 self.rolled_back.insert(tx_id);
592 }
593 } else if let Some(rest) = name.strip_prefix(APPLIED_MARKER_PREFIX) {
594 if let Ok(seq) = rest.parse::<i64>() {
595 self.applied.insert(seq);
596 if seq > self.applied_sequence {
597 self.applied_sequence = seq;
598 }
599 }
600 }
601 }
602
603 let log_snapshot: Vec<Transition> = self.log.clone();
605 self.replaying = true;
606 for transition in &log_snapshot {
607 if !self.committed.contains(&transition.transaction_id) {
608 continue;
609 }
610 if self.applied.contains(&transition.sequence) {
611 continue;
612 }
613 self.try_apply_transition(transition, true);
614 }
615 let mut pending_tx_ids: Vec<u128> = Vec::new();
617 for transition in log_snapshot.iter().rev() {
618 if self.committed.contains(&transition.transaction_id) {
619 continue;
620 }
621 if self.rolled_back.contains(&transition.transaction_id) {
622 continue;
623 }
624 self.try_revert_transition(transition);
625 if !pending_tx_ids.contains(&transition.transaction_id) {
626 pending_tx_ids.push(transition.transaction_id);
627 }
628 }
629 self.replaying = false;
630 for tx_id in pending_tx_ids {
631 self.rolled_back.insert(tx_id);
632 self.write_marker(&format!("{ROLLBACK_MARKER_PREFIX}{tx_id:032x}"))?;
633 }
634 Ok(())
635 }
636
637 fn enforce_retention(&mut self) -> Result<()> {
638 match self.retention_policy.clone() {
639 LogRetentionPolicy::Infinite => Ok(()),
640 LogRetentionPolicy::Sized { max_transitions } => self.enforce_sized(max_transitions),
641 LogRetentionPolicy::Chunked {
642 chunk_size,
643 archive_directory,
644 } => self.enforce_chunked(chunk_size, &archive_directory),
645 }
646 }
647
648 fn enforce_sized(&mut self, max_transitions: u64) -> Result<()> {
649 if max_transitions == 0 {
650 return Ok(());
651 }
652 while self.log.len() as u64 > max_transitions {
653 let head = self.log[0];
654 if !self.applied.contains(&head.sequence) {
655 self.replaying = true;
656 self.try_apply_transition(&head, true);
657 self.replaying = false;
658 if !self.applied.contains(&head.sequence) {
659 break; }
661 }
662 self.log.remove(0);
663 if self.trace {
664 eprintln!(
665 "[Transactions] Dropped applied transition seq={} per sized retention.",
666 head.sequence
667 );
668 }
669 }
670 Ok(())
671 }
672
673 fn enforce_chunked(&mut self, chunk_size: u64, archive_directory: &Path) -> Result<()> {
674 if chunk_size == 0 {
675 return Ok(());
676 }
677 if (self.log.len() as u64) < chunk_size {
678 return Ok(());
679 }
680 let chunk: Vec<Transition> = self.log.iter().take(chunk_size as usize).copied().collect();
681 for transition in &chunk {
682 if !self.applied.contains(&transition.sequence) {
683 self.replaying = true;
684 self.try_apply_transition(transition, true);
685 self.replaying = false;
686 if !self.applied.contains(&transition.sequence) {
687 return Ok(()); }
689 }
690 }
691 std::fs::create_dir_all(archive_directory).with_context(|| {
692 format!(
693 "failed to create archive dir {}",
694 archive_directory.display()
695 )
696 })?;
697 let timestamp = now_unix_ms();
698 let file_name = format!(
699 "transitions-chunk-{timestamp}-{:032x}.log",
700 new_transaction_id()
701 );
702 let path = archive_directory.join(file_name);
703 use std::io::Write;
704 let mut file = std::fs::File::create(&path)
705 .with_context(|| format!("failed to create archive file {}", path.display()))?;
706 for transition in &chunk {
707 writeln!(file, "{}", transition.serialize())?;
708 }
709 file.flush()?;
710 if self.trace {
711 eprintln!(
712 "[Transactions] Archived {} transitions to {}.",
713 chunk.len(),
714 path.display()
715 );
716 }
717 self.log.drain(0..chunk.len());
718 Ok(())
719 }
720}
721
722fn insert_ordered(list: &mut Vec<Transition>, transition: Transition) {
725 let mut lo = 0usize;
726 let mut hi = list.len();
727 while lo < hi {
728 let mid = (lo + hi) / 2;
729 if list[mid].sequence < transition.sequence {
730 lo = mid + 1;
731 } else {
732 hi = mid;
733 }
734 }
735 list.insert(lo, transition);
736}
737
738static TX_COUNTER: AtomicU64 = AtomicU64::new(0);
739
740fn new_transaction_id() -> u128 {
741 let count = TX_COUNTER.fetch_add(1, Ordering::Relaxed) as u128;
744 let now = now_unix_ms() as u128;
745 (now << 64) | count
746}
747
748fn now_unix_ms() -> i64 {
749 SystemTime::now()
750 .duration_since(UNIX_EPOCH)
751 .map(|d| d.as_millis() as i64)
752 .unwrap_or(0)
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758
759 #[test]
760 fn retention_policy_parses_specs() {
761 assert!(matches!(
762 LogRetentionPolicy::parse("infinite").unwrap(),
763 LogRetentionPolicy::Infinite
764 ));
765 assert!(matches!(
766 LogRetentionPolicy::parse("sized:1000").unwrap(),
767 LogRetentionPolicy::Sized {
768 max_transitions: 1000
769 }
770 ));
771 match LogRetentionPolicy::parse("chunked:500:/tmp/x").unwrap() {
772 LogRetentionPolicy::Chunked {
773 chunk_size,
774 archive_directory,
775 } => {
776 assert_eq!(chunk_size, 500);
777 assert_eq!(archive_directory, PathBuf::from("/tmp/x"));
778 }
779 _ => panic!("expected Chunked"),
780 }
781 assert!(LogRetentionPolicy::parse("garbage").is_err());
782 }
783
784 #[test]
785 fn transition_round_trips_through_serialize() {
786 let t = Transition {
787 transaction_id: 0xabcdef1234567890u128,
788 sequence: 42,
789 timestamp_ms: 1234567890,
790 kind: TransitionKind::Update,
791 before: DoubletLink::new(1, 2, 3),
792 after: DoubletLink::new(1, 4, 5),
793 };
794 let parsed = Transition::try_parse(&t.serialize()).unwrap();
795 assert_eq!(t, parsed);
796 }
797}