Skip to main content

couchbase_core/queryx/
preparedquery.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21
22use crate::httpx::client::Client;
23use crate::queryx::error;
24use crate::queryx::error::Error;
25use crate::queryx::query::Query;
26use crate::queryx::query_options::QueryOptions;
27use crate::queryx::query_respreader::QueryRespReader;
28
29#[derive(Clone, Debug, Default)]
30pub struct PreparedStatementCache {
31    cache: HashMap<String, String>,
32}
33
34impl PreparedStatementCache {
35    pub fn get(&self, statement: &str) -> Option<&String> {
36        self.cache.get(statement)
37    }
38
39    pub fn put(&mut self, statement: &str, prepared_name: &str) {
40        self.cache
41            .insert(statement.to_string(), prepared_name.to_string());
42    }
43}
44
45pub struct PreparedQuery<C: Client> {
46    pub executor: Query<C>,
47    pub cache: Arc<Mutex<PreparedStatementCache>>,
48}
49
50impl<C: Client> PreparedQuery<C> {
51    pub async fn prepared_query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
52        // We need to clone the options so that we can modify it with any cached statement.
53        let mut opts = (*opts).clone();
54
55        if let Some(ae) = opts.auto_execute {
56            // If this is already marked as auto-execute, we just pass it through
57            if ae {
58                return self.executor.query(&opts).await;
59            }
60        }
61
62        let statement = if let Some(statement) = opts.statement {
63            statement
64        } else {
65            return Err(Error::new_invalid_argument_error(
66                "statement must be present if auto_execute is true",
67                Some("statement".to_string()),
68            ));
69        };
70
71        // We have to manage the scope of the cache here, static analysis will flag us as holding
72        // the mutex across the await even if we manually drop just before it.
73        let cached;
74        {
75            let cache = self.cache.lock().unwrap();
76            cached = cache.get(&statement).cloned();
77        }
78
79        if let Some(cached_statement) = cached {
80            opts.statement = None;
81            opts.prepared = Some(cached_statement);
82
83            let res = self.executor.query(&opts).await;
84            if let Ok(reader) = res {
85                return Ok(reader);
86            }
87        };
88
89        opts.statement = Some(format!("PREPARE {}", &statement));
90        opts.auto_execute = Some(true);
91
92        let res = self.executor.query(&opts).await?;
93
94        let early_metadata = res.early_metadata();
95        if let Some(prepared) = &early_metadata.prepared {
96            let mut cache = self.cache.lock().unwrap();
97            cache.put(&statement, prepared);
98            drop(cache);
99        }
100
101        Ok(res)
102    }
103}