surrealdb_core/sql/statements/
kill.rs

1use std::fmt;
2
3use derive::Store;
4use reblessive::tree::Stk;
5use revision::revisioned;
6use serde::{Deserialize, Serialize};
7
8use crate::ctx::Context;
9use crate::dbs::Options;
10use crate::doc::CursorDoc;
11use crate::err::Error;
12use crate::fflags::FFLAGS;
13use crate::kvs::lq_structs::{KillEntry, TrackedResult};
14use crate::sql::Uuid;
15use crate::sql::Value;
16
17#[revisioned(revision = 1)]
18#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20#[non_exhaustive]
21pub struct KillStatement {
22	// Uuid of Live Query
23	// or Param resolving to Uuid of Live Query
24	pub id: Value,
25}
26
27impl KillStatement {
28	/// Process this type returning a computed simple Value
29	pub(crate) async fn compute(
30		&self,
31		stk: &mut Stk,
32		ctx: &Context<'_>,
33		opt: &Options,
34		_doc: Option<&CursorDoc<'_>>,
35	) -> Result<Value, Error> {
36		// Is realtime enabled?
37		opt.realtime()?;
38		// Valid options?
39		opt.valid_for_db()?;
40		// Resolve live query id
41		let live_query_id = match &self.id {
42			Value::Uuid(id) => *id,
43			Value::Param(param) => match param.compute(stk, ctx, opt, None).await? {
44				Value::Uuid(id) => id,
45				Value::Strand(id) => match uuid::Uuid::try_parse(&id) {
46					Ok(id) => Uuid(id),
47					_ => {
48						return Err(Error::KillStatement {
49							value:
50								"KILL received a parameter that could not be converted to a UUID"
51									.to_string(),
52						});
53					}
54				},
55				_ => {
56					return Err(Error::KillStatement {
57						value: "KILL received a parameter that was not expected".to_string(),
58					});
59				}
60			},
61			Value::Strand(maybe_id) => match uuid::Uuid::try_parse(maybe_id) {
62				Ok(id) => Uuid(id),
63				_ => {
64					return Err(Error::KillStatement {
65						value: "KILL received a Strand that could not be converted to a UUID"
66							.to_string(),
67					});
68				}
69			},
70			_ => {
71				return Err(Error::KillStatement {
72					value: "Unhandled type for KILL statement".to_string(),
73				});
74			}
75		};
76		// Claim transaction
77		let mut run = ctx.tx_lock().await;
78		if FFLAGS.change_feed_live_queries.enabled() {
79			run.pre_commit_register_async_event(TrackedResult::KillQuery(KillEntry {
80				live_id: live_query_id,
81				ns: opt.ns()?.to_string(),
82				db: opt.db()?.to_string(),
83			}))?;
84		} else {
85			// Fetch the live query key
86			let key = crate::key::node::lq::new(opt.id()?, live_query_id.0, opt.ns()?, opt.db()?);
87			// Fetch the live query key if it exists
88			match run.get(key).await? {
89				Some(val) => match std::str::from_utf8(&val) {
90					Ok(tb) => {
91						// Delete the node live query
92						let key = crate::key::node::lq::new(
93							opt.id()?,
94							live_query_id.0,
95							opt.ns()?,
96							opt.db()?,
97						);
98						run.del(key).await?;
99						// Delete the table live query
100						let key =
101							crate::key::table::lq::new(opt.ns()?, opt.db()?, tb, live_query_id.0);
102						run.del(key).await?;
103					}
104					_ => {
105						return Err(Error::KillStatement {
106							value: self.id.to_string(),
107						});
108					}
109				},
110				None => {
111					return Err(Error::KillStatement {
112						value: "KILL statement uuid did not exist".to_string(),
113					});
114				}
115			}
116		}
117		// Return the query id
118		Ok(Value::None)
119	}
120}
121
122impl fmt::Display for KillStatement {
123	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124		write!(f, "KILL {}", self.id)
125	}
126}
127
128#[cfg(test)]
129mod test {
130	use std::str::FromStr;
131
132	use crate::ctx::Context;
133	use crate::dbs::Options;
134	use crate::fflags::FFLAGS;
135	use crate::kvs::lq_structs::{KillEntry, TrackedResult};
136	use crate::kvs::{Datastore, LockType, TransactionType};
137	use crate::sql::statements::KillStatement;
138	use crate::sql::uuid::Uuid;
139
140	#[test_log::test(tokio::test)]
141	async fn kill_handles_uuid_event_registration() {
142		if !FFLAGS.change_feed_live_queries.enabled() {
143			return;
144		}
145		let res = KillStatement {
146			id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap().into(),
147		};
148		let ctx = Context::default();
149		let opt = Options::new()
150			.with_id(uuid::Uuid::from_str("8c41d9f7-a627-40f7-86f5-59d56cd765c6").unwrap())
151			.with_live(true)
152			.with_db(Some("database".into()))
153			.with_ns(Some("namespace".into()));
154		let ds = Datastore::new("memory").await.unwrap();
155		let tx =
156			ds.transaction(TransactionType::Write, LockType::Optimistic).await.unwrap().enclose();
157		let ctx = ctx.set_transaction(tx.clone());
158
159		let mut stack = reblessive::tree::TreeStack::new();
160
161		stack.enter(|stk| res.compute(stk, &ctx, &opt, None)).finish().await.unwrap();
162
163		let mut tx = tx.lock().await;
164		tx.commit().await.unwrap();
165
166		// Validate sent
167		assert_eq!(
168			tx.consume_pending_live_queries(),
169			vec![TrackedResult::KillQuery(KillEntry {
170				live_id: Uuid::from_str("8f92f057-c739-4bf2-9d0c-a74d01299efc").unwrap(),
171				ns: "namespace".to_string(),
172				db: "database".to_string(),
173			})]
174		);
175	}
176}