sentinel_driver/connection/
prepare.rs1use super::{frontend, BackendMessage, CacheMetrics, Connection, Error, Oid, Result, Statement};
2
3impl Connection {
4 pub async fn prepare(&mut self, sql: &str) -> Result<Statement> {
13 self.instr().on_event(&crate::Event::PrepareStart {
14 name: "", sql,
16 });
17 let started = std::time::Instant::now();
18 let res = self.prepare_inner(sql).await;
19 let duration = started.elapsed();
20 if let Ok(stmt) = &res {
21 let raw_oids: Vec<u32> = stmt.param_types().iter().map(|o| o.0).collect();
25 self.instr().on_event(&crate::Event::PrepareFinish {
26 name: stmt.name(),
27 param_oids: &raw_oids,
28 col_count: stmt.column_count() as u16,
29 duration,
30 cache_hit: false, });
32 }
33 res
38 }
39
40 async fn prepare_inner(&mut self, sql: &str) -> Result<Statement> {
43 let stmt_name = format!("_sentinel_p{}", self.process_id);
44
45 frontend::parse(self.conn.write_buf(), &stmt_name, sql, &[]);
46 frontend::describe_statement(self.conn.write_buf(), &stmt_name);
47 frontend::sync(self.conn.write_buf());
48 self.conn.send().await?;
49
50 match self.conn.recv().await? {
52 BackendMessage::ParseComplete => {}
53 BackendMessage::ErrorResponse { fields } => {
54 self.drain_until_ready().await.ok();
55 return Err(Error::server(
56 fields.severity,
57 fields.code,
58 fields.message,
59 fields.detail,
60 fields.hint,
61 fields.position,
62 ));
63 }
64 other => {
65 return Err(Error::protocol(format!(
66 "expected ParseComplete, got {other:?}"
67 )))
68 }
69 }
70
71 let param_oids = match self.conn.recv().await? {
73 BackendMessage::ParameterDescription { oids } => {
74 oids.into_iter().map(Oid::from).collect()
75 }
76 other => {
77 return Err(Error::protocol(format!(
78 "expected ParameterDescription, got {other:?}"
79 )))
80 }
81 };
82
83 let columns = match self.conn.recv().await? {
85 BackendMessage::RowDescription { fields } => Some(fields),
86 BackendMessage::NoData => None,
87 other => {
88 return Err(Error::protocol(format!(
89 "expected RowDescription/NoData, got {other:?}"
90 )))
91 }
92 };
93
94 self.drain_until_ready().await?;
96
97 Ok(Statement::new(
98 stmt_name,
99 sql.to_string(),
100 param_oids,
101 columns,
102 ))
103 }
104
105 pub fn register_statement(&mut self, name: &str, statement: Statement) {
107 self.stmt_cache.register(name, statement);
108 }
109
110 pub fn cache_metrics(&self) -> &CacheMetrics {
112 self.stmt_cache.metrics()
113 }
114}