1use crate::blr::{input_blr, message_blr, prepare_info_items};
21use crate::connection::Connection;
22use crate::error::{Error, Result};
23use crate::message::{decode_row, encode_row};
24use crate::transaction::Transaction;
25use crate::value::{ColumnMeta, Value};
26use crate::wire::consts::*;
27use crate::wire::response::{read_op, read_response, read_response_body};
28use crate::wire::stream::{op_name, op_packet};
29use crate::wire::xdr::{read_le_int, read_le_int_signed};
30
31const SQL_DIALECT: i32 = 3;
34
35const INFO_BUFFER_LEN: i32 = 0xfb80;
39
40const FETCH_BATCH: i32 = 200;
44
45const RECORDS_BUFFER_LEN: i32 = 64;
48
49#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
52pub struct RowsAffected {
53 pub selected: u64,
55 pub inserted: u64,
57 pub updated: u64,
59 pub deleted: u64,
61}
62
63impl RowsAffected {
64 pub fn total_modified(&self) -> u64 {
67 self.inserted + self.updated + self.deleted
68 }
69}
70
71#[derive(Debug)]
73pub struct Statement {
74 handle: i32,
75 stmt_type: i32,
76 params: Vec<ColumnMeta>,
77 columns: Vec<ColumnMeta>,
78 cursor_open: bool,
80 buffered: std::collections::VecDeque<Vec<Value>>,
82 exhausted: bool,
84 scrollable: bool,
87 fetch_size: i32,
89 dropped: bool,
90}
91
92impl Statement {
93 pub fn handle(&self) -> i32 {
95 self.handle
96 }
97
98 pub fn stmt_type(&self) -> i32 {
100 self.stmt_type
101 }
102
103 pub fn is_select(&self) -> bool {
105 self.stmt_type == stmt_type::SELECT || self.stmt_type == stmt_type::SELECT_FOR_UPD
106 }
107
108 pub fn set_scrollable(&mut self, yes: bool) {
113 self.scrollable = yes;
114 }
115
116 pub fn is_scrollable(&self) -> bool {
118 self.scrollable
119 }
120
121 pub fn set_fetch_size(&mut self, n: i32) {
126 self.fetch_size = n.max(1);
127 }
128
129 pub fn fetch_size(&self) -> i32 {
131 self.fetch_size
132 }
133
134 pub fn columns(&self) -> &[ColumnMeta] {
136 &self.columns
137 }
138
139 pub fn params(&self) -> &[ColumnMeta] {
141 &self.params
142 }
143
144 pub fn execute(
148 &mut self,
149 conn: &mut Connection,
150 tx: &Transaction,
151 params: &[Value],
152 ) -> Result<()> {
153 let has_params = !self.params.is_empty();
154 let in_blr = if has_params {
155 input_blr(&self.params)
156 } else {
157 Vec::new()
158 };
159 let message = if has_params {
160 encode_row(&self.params, params, conn.charset())?
161 } else {
162 Vec::new()
163 };
164
165 let mut w = op_packet(op::EXECUTE);
166 w.put_i32(self.handle);
167 w.put_i32(tx.handle());
168 w.put_bytes(&in_blr); w.put_i32(0); w.put_i32(if has_params { 1 } else { 0 }); if has_params {
175 w.put_raw(&message);
176 w.align();
177 }
178 w.put_bytes(&[]); w.put_i32(if self.scrollable {
186 cursor_type::SCROLLABLE
187 } else {
188 0
189 });
190 if conn.protocol_version() >= 18 {
197 w.put_i32(0); }
199 conn.io().send(&w)?;
200 read_response(conn.io())?;
201
202 self.cursor_open = self.is_select();
203 self.buffered.clear();
204 self.exhausted = false;
205 Ok(())
206 }
207
208 pub fn fetch(&mut self, conn: &mut Connection) -> Result<Option<Vec<Value>>> {
211 loop {
212 if let Some(row) = self.buffered.pop_front() {
213 return Ok(Some(row));
214 }
215 if self.exhausted || !self.cursor_open {
216 self.cursor_open = false;
217 return Ok(None);
218 }
219 self.fetch_batch(conn)?;
220 }
221 }
222
223 fn fetch_batch(&mut self, conn: &mut Connection) -> Result<()> {
227 let out_blr = message_blr(&self.columns);
228 let mut w = op_packet(op::FETCH);
229 w.put_i32(self.handle);
230 w.put_bytes(&out_blr);
231 w.put_i32(0); w.put_i32(self.fetch_size);
233 conn.io().send(&w)?;
234
235 loop {
236 let code = read_op(conn.io())?;
237 if code != op::FETCH_RESPONSE {
238 if code == op::RESPONSE {
239 read_response_body(conn.io())?.into_result()?;
241 }
242 return Err(Error::protocol(format!(
243 "expected op_fetch_response, got {} ({code})",
244 op_name(code)
245 )));
246 }
247
248 let status = conn.io().read_i32()?; let count = conn.io().read_i32()?; if count == 0 {
251 self.exhausted = status == 100;
254 return Ok(());
255 }
256
257 let cs = conn.charset();
258 let row = decode_row(conn.io(), &self.columns, cs)?;
259 self.buffered.push_back(row);
260 if status == 100 {
261 self.exhausted = true;
262 return Ok(());
263 }
264 }
265 }
266
267 pub fn fetch_all(&mut self, conn: &mut Connection) -> Result<Vec<Vec<Value>>> {
269 let mut rows = Vec::new();
270 while let Some(row) = self.fetch(conn)? {
271 rows.push(row);
272 }
273 Ok(rows)
274 }
275
276 pub fn rows<'a>(&'a mut self, conn: &'a mut Connection) -> RowStream<'a> {
288 RowStream { stmt: self, conn }
289 }
290
291 pub fn fetch_scroll(
299 &mut self,
300 conn: &mut Connection,
301 direction: i32,
302 offset: i32,
303 ) -> Result<Option<Vec<Value>>> {
304 if !self.cursor_open {
305 return Ok(None);
306 }
307 self.buffered.clear();
309
310 let out_blr = message_blr(&self.columns);
311 let mut w = op_packet(op::FETCH_SCROLL);
312 w.put_i32(self.handle);
313 w.put_bytes(&out_blr);
314 w.put_i32(0); w.put_i32(1); w.put_i32(direction);
317 w.put_i32(offset);
318 conn.io().send(&w)?;
319
320 let mut row = None;
323 loop {
324 let code = read_op(conn.io())?;
325 if code != op::FETCH_RESPONSE {
326 if code == op::RESPONSE {
327 read_response_body(conn.io())?.into_result()?;
328 }
329 return Err(Error::protocol(format!(
330 "expected op_fetch_response, got {} ({code})",
331 op_name(code)
332 )));
333 }
334 let status = conn.io().read_i32()?;
335 let count = conn.io().read_i32()?;
336 if count == 0 {
337 break;
338 }
339 let cs = conn.charset();
340 let r = decode_row(conn.io(), &self.columns, cs)?;
341 if row.is_none() {
342 row = Some(r);
343 }
344 if status == 100 {
345 break;
346 }
347 }
348 self.exhausted = false;
350 Ok(row)
351 }
352
353 pub fn fetch_next(&mut self, conn: &mut Connection) -> Result<Option<Vec<Value>>> {
355 self.fetch_scroll(conn, scroll::NEXT, 0)
356 }
357
358 pub fn fetch_prior(&mut self, conn: &mut Connection) -> Result<Option<Vec<Value>>> {
360 self.fetch_scroll(conn, scroll::PRIOR, 0)
361 }
362
363 pub fn fetch_first(&mut self, conn: &mut Connection) -> Result<Option<Vec<Value>>> {
365 self.fetch_scroll(conn, scroll::FIRST, 0)
366 }
367
368 pub fn fetch_last(&mut self, conn: &mut Connection) -> Result<Option<Vec<Value>>> {
370 self.fetch_scroll(conn, scroll::LAST, 0)
371 }
372
373 pub fn fetch_absolute(
375 &mut self,
376 conn: &mut Connection,
377 pos: i32,
378 ) -> Result<Option<Vec<Value>>> {
379 self.fetch_scroll(conn, scroll::ABSOLUTE, pos)
380 }
381
382 pub fn fetch_relative(
384 &mut self,
385 conn: &mut Connection,
386 offset: i32,
387 ) -> Result<Option<Vec<Value>>> {
388 self.fetch_scroll(conn, scroll::RELATIVE, offset)
389 }
390
391 pub fn rows_affected(&self, conn: &mut Connection) -> Result<RowsAffected> {
395 let w = crate::connection::info_request(
396 op::INFO_SQL,
397 self.handle,
398 &[isql::RECORDS],
399 RECORDS_BUFFER_LEN,
400 );
401 conn.io().send(&w)?;
402 let resp = read_response(conn.io())?;
403 Ok(parse_records(&resp.data))
404 }
405
406 pub fn close(&mut self, conn: &mut Connection) -> Result<()> {
409 if !self.cursor_open {
410 return Ok(());
411 }
412 self.free(conn, free::CLOSE)?;
413 self.cursor_open = false;
414 Ok(())
415 }
416
417 pub fn drop_statement(mut self, conn: &mut Connection) -> Result<()> {
420 self.free(conn, free::DROP)?;
421 self.dropped = true;
422 Ok(())
423 }
424
425 fn free(&mut self, conn: &mut Connection, mode: i32) -> Result<()> {
426 let mut w = op_packet(op::FREE_STATEMENT);
427 w.put_i32(self.handle);
428 w.put_i32(mode);
429 conn.io().send(&w)?;
430 read_response(conn.io())?;
431 Ok(())
432 }
433
434 pub(crate) fn forget_handle(&mut self) {
438 self.dropped = true;
439 }
440}
441
442impl Drop for Statement {
443 fn drop(&mut self) {
444 if !self.dropped {
445 crate::warn_unclosed("Statement", self.handle);
446 }
447 }
448}
449
450pub struct RowStream<'a> {
460 stmt: &'a mut Statement,
461 conn: &'a mut Connection,
462}
463
464impl RowStream<'_> {
465 pub fn try_next(&mut self) -> Result<Option<Vec<Value>>> {
467 self.stmt.fetch(self.conn)
468 }
469
470 #[allow(clippy::should_implement_trait)]
475 pub fn next(&mut self) -> Result<Option<Vec<Value>>> {
476 self.try_next()
477 }
478
479 pub fn try_collect(mut self) -> Result<Vec<Vec<Value>>> {
482 let mut rows = Vec::new();
483 while let Some(row) = self.try_next()? {
484 rows.push(row);
485 }
486 Ok(rows)
487 }
488
489 pub fn try_for_each<F>(mut self, mut f: F) -> Result<()>
491 where
492 F: FnMut(Vec<Value>) -> Result<()>,
493 {
494 while let Some(row) = self.try_next()? {
495 f(row)?;
496 }
497 Ok(())
498 }
499}
500
501impl Connection {
502 pub fn prepare(&mut self, tx: &Transaction, sql: &str) -> Result<Statement> {
504 let mut w = op_packet(op::ALLOCATE_STATEMENT);
506 w.put_i32(self.db_handle());
507 self.io().send(&w)?;
508 let handle = read_response(self.io())?.handle;
509
510 let mut w = op_packet(op::PREPARE_STATEMENT);
513 w.put_i32(tx.handle());
514 w.put_i32(handle);
515 w.put_i32(SQL_DIALECT);
516 w.put_str(sql);
517 w.put_bytes(prepare_info_items());
518 w.put_i32(INFO_BUFFER_LEN);
519 self.io().send(&w)?;
520 let resp = read_response(self.io())?;
521
522 let info = parse_prepare_response(&resp.data)?;
523 Ok(Statement {
524 handle,
525 stmt_type: info.stmt_type,
526 params: info.params,
527 columns: info.columns,
528 cursor_open: false,
529 buffered: std::collections::VecDeque::new(),
530 exhausted: false,
531 scrollable: false,
532 fetch_size: FETCH_BATCH,
533 dropped: false,
534 })
535 }
536}
537
538struct PreparedInfo {
540 stmt_type: i32,
541 params: Vec<ColumnMeta>,
542 columns: Vec<ColumnMeta>,
543}
544
545#[derive(Clone, Copy, PartialEq)]
547enum Block {
548 None,
549 Bind,
550 Select,
551}
552
553fn parse_prepare_response(data: &[u8]) -> Result<PreparedInfo> {
557 let mut stmt_type = 0;
558 let mut params = Vec::new();
559 let mut columns = Vec::new();
560 let mut block = Block::None;
561 let mut cur: Option<ColumnMeta> = None;
562
563 let mut i = 0;
564 while i < data.len() {
565 let tag = data[i];
566 i += 1;
567 match tag {
568 INFO_END => break,
569 INFO_TRUNCATED => {
570 return Err(Error::protocol(
571 "prepare describe-info truncated; buffer too small",
572 ));
573 }
574 isql::SELECT => block = Block::Select,
575 isql::BIND => block = Block::Bind,
576 isql::DESCRIBE_END => {
577 if let Some(c) = cur.take() {
578 match block {
579 Block::Bind => params.push(c),
580 Block::Select => columns.push(c),
581 Block::None => {}
582 }
583 }
584 }
585 _ => {
586 if i + 2 > data.len() {
588 return Err(Error::protocol("prepare describe-info: short length"));
589 }
590 let len = u16::from_le_bytes([data[i], data[i + 1]]) as usize;
591 i += 2;
592 if i + len > data.len() {
593 return Err(Error::protocol("prepare describe-info: short value"));
594 }
595 let val = &data[i..i + len];
596 i += len;
597 apply_info_item(tag, val, &mut stmt_type, &mut cur);
598 }
599 }
600 }
601
602 Ok(PreparedInfo {
603 stmt_type,
604 params,
605 columns,
606 })
607}
608
609fn apply_info_item(tag: u8, val: &[u8], stmt_type: &mut i32, cur: &mut Option<ColumnMeta>) {
610 match tag {
611 isql::STMT_TYPE => *stmt_type = read_le_int(val) as i32,
612 isql::SQLDA_SEQ => {
613 let seq = read_le_int(val) as usize;
615 *cur = Some(ColumnMeta {
616 index: seq.saturating_sub(1),
617 ..Default::default()
618 });
619 }
620 isql::TYPE => {
621 if let Some(c) = cur.as_mut() {
622 let t = read_le_int(val) as i32;
623 c.sql_type = t;
624 c.nullable = sql_type::is_nullable(t);
625 }
626 }
627 isql::SUB_TYPE => {
628 if let Some(c) = cur.as_mut() {
629 c.sub_type = read_le_int_signed(val) as i32;
630 }
631 }
632 isql::SCALE => {
633 if let Some(c) = cur.as_mut() {
634 c.scale = read_le_int_signed(val) as i32;
635 }
636 }
637 isql::LENGTH => {
638 if let Some(c) = cur.as_mut() {
639 c.length = read_le_int(val) as i32;
640 }
641 }
642 isql::FIELD => set_name(cur, val, |c, s| c.field = s),
643 isql::RELATION => set_name(cur, val, |c, s| c.relation = s),
644 isql::ALIAS => set_name(cur, val, |c, s| c.alias = s),
645 isql::OWNER => set_name(cur, val, |c, s| c.owner = s),
646 _ => {}
649 }
650}
651
652fn set_name(cur: &mut Option<ColumnMeta>, val: &[u8], assign: impl Fn(&mut ColumnMeta, String)) {
653 if let Some(c) = cur.as_mut() {
654 assign(c, String::from_utf8_lossy(val).into_owned());
655 }
656}
657
658fn parse_records(data: &[u8]) -> RowsAffected {
662 let mut out = RowsAffected::default();
663 for (tag, val) in InfoItems::new(data) {
664 if tag == isql::RECORDS {
665 for (sub, v) in InfoItems::new(val) {
666 let n = read_le_int(v) as u64;
667 match sub {
668 info_req::SELECT_COUNT => out.selected = n,
669 info_req::INSERT_COUNT => out.inserted = n,
670 info_req::UPDATE_COUNT => out.updated = n,
671 info_req::DELETE_COUNT => out.deleted = n,
672 _ => {}
673 }
674 }
675 }
676 }
677 out
678}
679
680struct InfoItems<'a> {
683 data: &'a [u8],
684 pos: usize,
685}
686
687impl<'a> InfoItems<'a> {
688 fn new(data: &'a [u8]) -> Self {
689 InfoItems { data, pos: 0 }
690 }
691}
692
693impl<'a> Iterator for InfoItems<'a> {
694 type Item = (u8, &'a [u8]);
695
696 fn next(&mut self) -> Option<Self::Item> {
697 let tag = *self.data.get(self.pos)?;
698 if tag == INFO_END {
699 return None;
700 }
701 self.pos += 1;
702 let lo = *self.data.get(self.pos)? as usize;
703 let hi = *self.data.get(self.pos + 1)? as usize;
704 let len = lo | (hi << 8);
705 self.pos += 2;
706 let val = self.data.get(self.pos..self.pos + len)?;
707 self.pos += len;
708 Some((tag, val))
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715
716 fn item(tag: u8, val: &[u8]) -> Vec<u8> {
718 let mut v = vec![tag];
719 v.extend_from_slice(&(val.len() as u16).to_le_bytes());
720 v.extend_from_slice(val);
721 v
722 }
723
724 #[test]
725 fn parses_select_describe_for_two_columns() {
726 let mut data = Vec::new();
729 data.extend(item(isql::STMT_TYPE, &stmt_type::SELECT.to_le_bytes()));
730 data.push(isql::BIND);
732 data.extend(item(isql::DESCRIBE_VARS, &0i32.to_le_bytes()));
733 data.push(isql::SELECT);
735 data.extend(item(isql::DESCRIBE_VARS, &2i32.to_le_bytes()));
736
737 data.extend(item(isql::SQLDA_SEQ, &1i32.to_le_bytes()));
738 data.extend(item(isql::TYPE, &(sql_type::SHORT | 1).to_le_bytes())); data.extend(item(isql::SUB_TYPE, &0i32.to_le_bytes()));
740 data.extend(item(isql::SCALE, &0i32.to_le_bytes()));
741 data.extend(item(isql::LENGTH, &2i32.to_le_bytes()));
742 data.extend(item(isql::FIELD, b"EMP_NO"));
743 data.extend(item(isql::ALIAS, b"EMP_NO"));
744 data.push(isql::DESCRIBE_END);
745
746 data.extend(item(isql::SQLDA_SEQ, &2i32.to_le_bytes()));
747 data.extend(item(isql::TYPE, &sql_type::VARYING.to_le_bytes()));
748 data.extend(item(isql::SUB_TYPE, &0i32.to_le_bytes()));
749 data.extend(item(isql::SCALE, &0i32.to_le_bytes()));
750 data.extend(item(isql::LENGTH, &15i32.to_le_bytes()));
751 data.extend(item(isql::FIELD, b"FIRST_NAME"));
752 data.extend(item(isql::ALIAS, b"FIRST_NAME"));
753 data.push(isql::DESCRIBE_END);
754
755 data.push(INFO_END);
756
757 let info = parse_prepare_response(&data).unwrap();
758 assert_eq!(info.stmt_type, stmt_type::SELECT);
759 assert!(info.params.is_empty());
760 assert_eq!(info.columns.len(), 2);
761
762 let emp_no = &info.columns[0];
763 assert_eq!(emp_no.index, 0);
764 assert_eq!(sql_type::base(emp_no.sql_type), sql_type::SHORT);
765 assert!(emp_no.nullable);
766 assert_eq!(emp_no.name(), "EMP_NO");
767
768 let first_name = &info.columns[1];
769 assert_eq!(sql_type::base(first_name.sql_type), sql_type::VARYING);
770 assert_eq!(first_name.length, 15);
771 assert!(!first_name.nullable);
772 assert_eq!(first_name.name(), "FIRST_NAME");
773 }
774
775 #[test]
776 fn truncated_info_is_an_error() {
777 let data = [INFO_TRUNCATED];
778 assert!(parse_prepare_response(&data).is_err());
779 }
780
781 #[test]
782 fn parses_record_counts() {
783 fn sub(tag: u8, n: i32) -> Vec<u8> {
786 let mut v = vec![tag, 4, 0]; v.extend_from_slice(&n.to_le_bytes());
788 v
789 }
790 let mut nested = Vec::new();
791 nested.extend(sub(info_req::SELECT_COUNT, 5));
792 nested.extend(sub(info_req::INSERT_COUNT, 0));
793 nested.extend(sub(info_req::UPDATE_COUNT, 5));
794 nested.extend(sub(info_req::DELETE_COUNT, 0));
795
796 let mut data = vec![isql::RECORDS];
797 data.extend_from_slice(&(nested.len() as u16).to_le_bytes());
798 data.extend_from_slice(&nested);
799 data.push(INFO_END);
800
801 let r = parse_records(&data);
802 assert_eq!(r.selected, 5);
803 assert_eq!(r.updated, 5);
804 assert_eq!(r.inserted, 0);
805 assert_eq!(r.deleted, 0);
806 assert_eq!(r.total_modified(), 5);
807 }
808
809 #[test]
810 fn negative_scale_is_sign_extended() {
811 let mut data = Vec::new();
812 data.push(isql::SELECT);
813 data.extend(item(isql::SQLDA_SEQ, &1i32.to_le_bytes()));
814 data.extend(item(isql::TYPE, &sql_type::INT64.to_le_bytes()));
815 data.extend(item(isql::SCALE, &(-2i32).to_le_bytes()));
816 data.extend(item(isql::LENGTH, &8i32.to_le_bytes()));
817 data.push(isql::DESCRIBE_END);
818 data.push(INFO_END);
819
820 let info = parse_prepare_response(&data).unwrap();
821 assert_eq!(info.columns[0].scale, -2);
822 }
823}