1use std::time::Duration;
2
3use crate::bson::{Bson, Document, RawDocumentBuf};
4
5use crate::{
6 bson_compat::RawResult,
7 client::session::TransactionState,
8 coll::options::CursorType,
9 db::options::{RunCommandOptions, RunCursorCommandOptions},
10 error::{ErrorKind, Result},
11 operation::{run_command, run_cursor_command},
12 selection_criteria::SelectionCriteria,
13 ClientSession,
14 Cursor,
15 Database,
16 SessionCursor,
17};
18
19use super::{
20 action_impl,
21 deeplink,
22 export_doc,
23 option_setters,
24 options_doc,
25 ExplicitSession,
26 ImplicitSession,
27};
28
29impl Database {
30 #[deeplink]
40 #[options_doc(run_command)]
41 pub fn run_command(&self, command: Document) -> RunCommand<'_> {
42 RunCommand {
43 db: self,
44 command: RawDocumentBuf::try_from(&command),
45 options: None,
46 session: None,
47 }
48 }
49
50 #[deeplink]
60 #[options_doc(run_command)]
61 pub fn run_raw_command(&self, command: RawDocumentBuf) -> RunCommand<'_> {
62 RunCommand {
63 db: self,
64 command: Ok(command),
65 options: None,
66 session: None,
67 }
68 }
69
70 #[deeplink]
75 #[options_doc(run_cursor_command)]
76 pub fn run_cursor_command(&self, command: Document) -> RunCursorCommand<'_> {
77 RunCursorCommand {
78 db: self,
79 command: RawDocumentBuf::try_from(&command),
80 options: None,
81 session: ImplicitSession,
82 }
83 }
84
85 #[deeplink]
90 #[options_doc(run_cursor_command)]
91 pub fn run_raw_cursor_command(&self, command: RawDocumentBuf) -> RunCursorCommand<'_> {
92 RunCursorCommand {
93 db: self,
94 command: Ok(command),
95 options: None,
96 session: ImplicitSession,
97 }
98 }
99}
100
101#[cfg(feature = "sync")]
102impl crate::sync::Database {
103 #[deeplink]
113 #[options_doc(run_command, "run")]
114 pub fn run_command(&self, command: Document) -> RunCommand<'_> {
115 self.async_database.run_command(command)
116 }
117
118 #[deeplink]
128 #[options_doc(run_command, "run")]
129 pub fn run_raw_command(&self, command: RawDocumentBuf) -> RunCommand<'_> {
130 self.async_database.run_raw_command(command)
131 }
132
133 #[deeplink]
138 #[options_doc(run_cursor_command, "run")]
139 pub fn run_cursor_command(&self, command: Document) -> RunCursorCommand<'_> {
140 self.async_database.run_cursor_command(command)
141 }
142
143 #[deeplink]
148 #[options_doc(run_cursor_command, "run")]
149 pub fn run_raw_cursor_command(&self, command: RawDocumentBuf) -> RunCursorCommand<'_> {
150 self.async_database.run_raw_cursor_command(command)
151 }
152}
153
154#[must_use]
156pub struct RunCommand<'a> {
157 db: &'a Database,
158 command: RawResult<RawDocumentBuf>,
159 options: Option<RunCommandOptions>,
160 session: Option<&'a mut ClientSession>,
161}
162
163#[option_setters(crate::db::options::RunCommandOptions)]
164#[export_doc(run_command)]
165impl<'a> RunCommand<'a> {
166 pub fn session(mut self, value: impl Into<&'a mut ClientSession>) -> Self {
168 self.session = Some(value.into());
169 self
170 }
171}
172
173#[action_impl]
174impl<'a> Action for RunCommand<'a> {
175 type Future = RunCommandFuture;
176
177 async fn execute(self) -> Result<Document> {
178 let mut selection_criteria = self.options.and_then(|o| o.selection_criteria);
179 let command = self.command?;
180 if let Some(session) = &self.session {
181 match session.transaction.state {
182 TransactionState::Starting | TransactionState::InProgress => {
183 if command.get("readConcern").is_ok_and(|rc| rc.is_some()) {
184 return Err(ErrorKind::InvalidArgument {
185 message: "Cannot set read concern after starting a transaction".into(),
186 }
187 .into());
188 }
189 selection_criteria = match selection_criteria {
190 Some(selection_criteria) => Some(selection_criteria),
191 None => {
192 if let Some(ref options) = session.transaction.options {
193 options.selection_criteria.clone()
194 } else {
195 None
196 }
197 }
198 };
199 }
200 _ => {}
201 }
202 }
203
204 let operation =
205 run_command::RunCommand::new(self.db.clone(), command, selection_criteria, None);
206 self.db
207 .client()
208 .execute_operation(operation, self.session)
209 .await
210 }
211}
212
213#[must_use]
216pub struct RunCursorCommand<'a, Session = ImplicitSession> {
217 db: &'a Database,
218 command: RawResult<RawDocumentBuf>,
219 options: Option<RunCursorCommandOptions>,
220 session: Session,
221}
222
223#[option_setters(crate::db::options::RunCursorCommandOptions)]
224#[export_doc(run_cursor_command, extra = [session, batch])]
225impl<Session> RunCursorCommand<'_, Session> {}
226
227impl<'a> RunCursorCommand<'a, ImplicitSession> {
228 pub fn session(
230 self,
231 value: impl Into<&'a mut ClientSession>,
232 ) -> RunCursorCommand<'a, ExplicitSession<'a>> {
233 RunCursorCommand {
234 db: self.db,
235 command: self.command,
236 options: self.options,
237 session: ExplicitSession(value.into()),
238 }
239 }
240}
241
242#[action_impl(sync = crate::sync::Cursor<Document>)]
243impl<'a> Action for RunCursorCommand<'a, ImplicitSession> {
244 type Future = RunCursorCommandFuture;
245
246 async fn execute(self) -> Result<Cursor<Document>> {
247 self.exec_generic().await
248 }
249}
250
251impl<'a> RunCursorCommand<'a, ImplicitSession> {
252 async fn exec_generic<C: crate::cursor::NewCursor>(self) -> Result<C> {
253 let selection_criteria = self
254 .options
255 .as_ref()
256 .and_then(|options| options.selection_criteria.clone());
257 let rcc =
258 run_command::RunCommand::new(self.db.clone(), self.command?, selection_criteria, None);
259 let mut rc_command = run_cursor_command::RunCursorCommand::new(rcc, self.options)?;
260 let client = self.db.client();
261 client.execute_cursor_operation(&mut rc_command, None).await
262 }
263
264 pub async fn batch(self) -> Result<crate::raw_batch_cursor::RawBatchCursor> {
266 self.exec_generic().await
267 }
268}
269
270#[action_impl(sync = crate::sync::SessionCursor<Document>)]
271impl<'a> Action for RunCursorCommand<'a, ExplicitSession<'a>> {
272 type Future = RunCursorCommandSessionFuture;
273
274 async fn execute(mut self) -> Result<SessionCursor<Document>> {
275 self.exec_generic().await
276 }
277}
278
279impl<'a> RunCursorCommand<'a, ExplicitSession<'a>> {
280 async fn exec_generic<C: crate::cursor::NewCursor>(self) -> Result<C> {
281 let selection_criteria = self
282 .options
283 .as_ref()
284 .and_then(|options| options.selection_criteria.clone());
285 let rcc =
286 run_command::RunCommand::new(self.db.clone(), self.command?, selection_criteria, None);
287 let mut rc_command = run_cursor_command::RunCursorCommand::new(rcc, self.options)?;
288 let client = self.db.client();
289 client
290 .execute_cursor_operation(&mut rc_command, Some(self.session.0))
291 .await
292 }
293
294 pub async fn batch(self) -> Result<crate::raw_batch_cursor::SessionRawBatchCursor> {
296 self.exec_generic().await
297 }
298}