1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
impl Document {
pub(super) async fn store_record_data(
&mut self,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
// Check if changed
if !self.changed() {
return Ok(());
}
// Check if the table is a view
if self.tb(ctx, opt).await?.drop {
return Ok(());
}
// Get the record id
let rid = self.id()?;
// Get NS & DB
let ns = opt.ns()?;
let db = opt.db()?;
// Store the record data
let key = crate::key::thing::new(ns, db, &rid.tb, &rid.id);
// Match the statement type
match stm {
// This is a INSERT statement so try to insert the key.
// For INSERT statements we don't first check for the
// entry from the storage engine, so when we attempt
// to store the record value, we presume that the key
// does not exist. If the record value exists then we
// attempt to run the ON DUPLICATE KEY UPDATE clause but
// at this point the current document is not empty so we
// set and update the key, without checking if the key
// already exists in the storage engine.
Statement::Insert(_) if self.is_iteration_initial() => {
match ctx.tx().put(key, &*self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.as_ref().to_owned(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
}
}
// This is a UPSERT statement so try to insert the key.
// For UPSERT statements we don't first check for the
// entry from the storage engine, so when we attempt
// to store the record value, we must ensure that the
// key does not exist. If the record value exists then we
// retry and attempt to update the record which exists.
Statement::Upsert(_) if self.is_iteration_initial() => {
match ctx.tx().put(key, &*self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.as_ref().to_owned(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
}
}
// This is a CREATE statement so try to insert the key.
// For CREATE statements we don't first check for the
// entry from the storage engine, so when we attempt
// to store the record value, we must ensure that the
// key does not exist. If it already exists, then we
// return an error, and the statement fails.
Statement::Create(_) => {
match ctx.tx().put(key, &*self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.as_ref().to_owned(),
}),
// Return any other received error
Err(e) => Err(e),
// Record creation worked fine
Ok(v) => Ok(v),
}
}
// Let's update the stored value for the specified key
_ => ctx.tx().set(key, &*self, opt.version).await,
}?;
// Update the cache
ctx.tx().set_record_cache(ns, db, &rid.tb, &rid.id, self.current.doc.as_arc())?;
// Carry on
Ok(())
}
}