Skip to main content

sentinel_driver/connection/
prepare.rs

1use super::{frontend, BackendMessage, CacheMetrics, Connection, Error, Oid, Result, Statement};
2
3impl Connection {
4    /// Prepare a statement on the server using extended query protocol.
5    ///
6    /// Returns a `Statement` with parameter types and column descriptions.
7    ///
8    /// Emits `PrepareStart` before the wire round-trip and `PrepareFinish` on
9    /// success. `cache_hit` is always `false` today because `prepare()` always
10    /// sends Parse; a future task that wires automatic cache lookup will toggle
11    /// that field without breaking the API.
12    pub async fn prepare(&mut self, sql: &str) -> Result<Statement> {
13        self.instr().on_event(&crate::Event::PrepareStart {
14            name: "", // server-side name not yet assigned at this point
15            sql,
16        });
17        let started = std::time::Instant::now();
18        let res = self.prepare_inner(sql).await;
19        let duration = started.elapsed();
20        if let Ok(stmt) = &res {
21            // Collect param OIDs as raw u32 for the event (Oid is Vec<Oid>
22            // internally; Option B — one allocation on the prepare path, which
23            // is already a full network round-trip).
24            let raw_oids: Vec<u32> = stmt.param_types().iter().map(|o| o.0).collect();
25            self.instr().on_event(&crate::Event::PrepareFinish {
26                name: stmt.name(),
27                param_oids: &raw_oids,
28                col_count: stmt.column_count() as u16,
29                duration,
30                cache_hit: false, // current prepare() never consults the cache
31            });
32        }
33        // On error we don't emit PrepareFinish — the prepare failed; consumers
34        // will see ExecuteFinish with the same Err if a downstream query was
35        // attempted, or no event at all if the caller just discards the error.
36        // (Matches the design doc's borrowing model: events are best-effort.)
37        res
38    }
39
40    /// Inner prepare implementation — sends Parse/Describe/Sync and awaits the
41    /// server responses. Called exclusively by `prepare()`.
42    async fn prepare_inner(&mut self, sql: &str) -> Result<Statement> {
43        let stmt_name = format!("_sentinel_p{}", self.process_id);
44
45        frontend::parse(self.conn.write_buf(), &stmt_name, sql, &[]);
46        frontend::describe_statement(self.conn.write_buf(), &stmt_name);
47        frontend::sync(self.conn.write_buf());
48        self.conn.send().await?;
49
50        // ParseComplete
51        match self.conn.recv().await? {
52            BackendMessage::ParseComplete => {}
53            BackendMessage::ErrorResponse { fields } => {
54                self.drain_until_ready().await.ok();
55                return Err(Error::server(
56                    fields.severity,
57                    fields.code,
58                    fields.message,
59                    fields.detail,
60                    fields.hint,
61                    fields.position,
62                ));
63            }
64            other => {
65                return Err(Error::protocol(format!(
66                    "expected ParseComplete, got {other:?}"
67                )))
68            }
69        }
70
71        // ParameterDescription
72        let param_oids = match self.conn.recv().await? {
73            BackendMessage::ParameterDescription { oids } => {
74                oids.into_iter().map(Oid::from).collect()
75            }
76            other => {
77                return Err(Error::protocol(format!(
78                    "expected ParameterDescription, got {other:?}"
79                )))
80            }
81        };
82
83        // RowDescription or NoData
84        let columns = match self.conn.recv().await? {
85            BackendMessage::RowDescription { fields } => Some(fields),
86            BackendMessage::NoData => None,
87            other => {
88                return Err(Error::protocol(format!(
89                    "expected RowDescription/NoData, got {other:?}"
90                )))
91            }
92        };
93
94        // ReadyForQuery
95        self.drain_until_ready().await?;
96
97        Ok(Statement::new(
98            stmt_name,
99            sql.to_string(),
100            param_oids,
101            columns,
102        ))
103    }
104
105    /// Register a prepared statement in the Tier 1 cache.
106    pub fn register_statement(&mut self, name: &str, statement: Statement) {
107        self.stmt_cache.register(name, statement);
108    }
109
110    /// Get statement cache metrics.
111    pub fn cache_metrics(&self) -> &CacheMetrics {
112        self.stmt_cache.metrics()
113    }
114}