surrealdb_core/sql/statements/
kill.rs1use std::fmt;
2
3use derive::Store;
4use reblessive::tree::Stk;
5use revision::revisioned;
6use serde::{Deserialize, Serialize};
7
8use crate::ctx::Context;
9use crate::dbs::Options;
10use crate::doc::CursorDoc;
11use crate::err::Error;
12use crate::fflags::FFLAGS;
13use crate::kvs::lq_structs::{KillEntry, TrackedResult};
14use crate::sql::Uuid;
15use crate::sql::Value;
16
17#[revisioned(revision = 1)]
18#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20#[non_exhaustive]
21pub struct KillStatement {
22 pub id: Value,
25}
26
27impl KillStatement {
28 pub(crate) async fn compute(
30 &self,
31 stk: &mut Stk,
32 ctx: &Context<'_>,
33 opt: &Options,
34 _doc: Option<&CursorDoc<'_>>,
35 ) -> Result<Value, Error> {
36 opt.realtime()?;
38 opt.valid_for_db()?;
40 let live_query_id = match &self.id {
42 Value::Uuid(id) => *id,
43 Value::Param(param) => match param.compute(stk, ctx, opt, None).await? {
44 Value::Uuid(id) => id,
45 Value::Strand(id) => match uuid::Uuid::try_parse(&id) {
46 Ok(id) => Uuid(id),
47 _ => {
48 return Err(Error::KillStatement {
49 value:
50 "KILL received a parameter that could not be converted to a UUID"
51 .to_string(),
52 });
53 }
54 },
55 _ => {
56 return Err(Error::KillStatement {
57 value: "KILL received a parameter that was not expected".to_string(),
58 });
59 }
60 },
61 Value::Strand(maybe_id) => match uuid::Uuid::try_parse(maybe_id) {
62 Ok(id) => Uuid(id),
63 _ => {
64 return Err(Error::KillStatement {
65 value: "KILL received a Strand that could not be converted to a UUID"
66 .to_string(),
67 });
68 }
69 },
70 _ => {
71 return Err(Error::KillStatement {
72 value: "Unhandled type for KILL statement".to_string(),
73 });
74 }
75 };
76 let mut run = ctx.tx_lock().await;
78 if FFLAGS.change_feed_live_queries.enabled() {
79 run.pre_commit_register_async_event(TrackedResult::KillQuery(KillEntry {
80 live_id: live_query_id,
81 ns: opt.ns()?.to_string(),
82 db: opt.db()?.to_string(),
83 }))?;
84 } else {
85 let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns()?, opt.db()?);
87 match run.get(key).await? {
89 Some(val) => match std::str::from_utf8(&val) {
90 Ok(tb) => {
91 let key = crate::key::node::lq::new(
93 opt.id()?,
94 live_query_id.0,
95 opt.ns()?,
96 opt.db()?,
97 );
98 run.del(key).await?;
99 let key =
101 crate::key::table::lq::new(opt.ns()?, opt.db()?, tb, live_query_id.0);
102 run.del(key).await?;
103 }
104 _ => {
105 return Err(Error::KillStatement {
106 value: self.id.to_string(),
107 });
108 }
109 },
110 None => {
111 return Err(Error::KillStatement {
112 value: "KILL statement uuid did not exist".to_string(),
113 });
114 }
115 }
116 }
117 Ok(Value::None)
119 }
120}
121
122impl fmt::Display for KillStatement {
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 write!(f, "KILL {}", self.id)
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use std::str::FromStr;
131
132 use crate::ctx::Context;
133 use crate::dbs::Options;
134 use crate::fflags::FFLAGS;
135 use crate::kvs::lq_structs::{KillEntry, TrackedResult};
136 use crate::kvs::{Datastore, LockType, TransactionType};
137 use crate::sql::statements::KillStatement;
138 use crate::sql::uuid::Uuid;
139
140 #[test_log::test(tokio::test)]
141 async fn kill_handles_uuid_event_registration() {
142 if !FFLAGS.change_feed_live_queries.enabled() {
143 return;
144 }
145 let res = KillStatement {
146 id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap().into(),
147 };
148 let ctx = Context::default();
149 let opt = Options::new()
150 .with_id(uuid::Uuid::from_str("8c41d9f7-a627-40f7-86f5-59d56cd765c6").unwrap())
151 .with_live(true)
152 .with_db(Some("database".into()))
153 .with_ns(Some("namespace".into()));
154 let ds = Datastore::new("memory").await.unwrap();
155 let tx =
156 ds.transaction(TransactionType::Write, LockType::Optimistic).await.unwrap().enclose();
157 let ctx = ctx.set_transaction(tx.clone());
158
159 let mut stack = reblessive::tree::TreeStack::new();
160
161 stack.enter(|stk| res.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
162
163 let mut tx = tx.lock().await;
164 tx.commit().await.unwrap();
165
166 assert_eq!(
168 tx.consume_pending_live_queries(),
169 vec![TrackedResult::KillQuery(KillEntry {
170 live_id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap(),
171 ns: "namespace".to_string(),
172 db: "database".to_string(),
173 })]
174 );
175 }
176}