Skip to main content

firebird_wire/
message.rs

1//! Decodificação de mensagens de linha.
2//!
3//! Formato da transmissão (payload do op_fetch_response): um bitmap de nulos
4//! little-endian inicial de `align4(ceil(ncols/8))` bytes (bit *i* ligado ⇒
5//! coluna *i* é NULL), seguido do valor codificado em XDR de cada coluna
6//! NÃO-NULL, em ordem. Colunas nulas não contribuem com bytes, então as
7//! mensagens têm comprimento variável e devem ser decodificadas campo a campo
8//! direto do stream.
9
10use crate::charset::Charset;
11use crate::error::{Error, Result};
12use crate::value::{ColumnMeta, Value, align4};
13use crate::wire::consts::sql_type;
14use crate::wire::stream::FbStream;
15
16/// Número de bytes no bitmap de nulos inicial para `ncols` colunas.
17pub fn null_bitmap_len(ncols: usize) -> usize {
18    align4(ncols.div_ceil(8))
19}
20
21/// Comprimento do buffer de mensagem *do lado do cliente* (não compactado) que o
22/// servidor espera em `op_batch_create` (`p_batch_msglen`). É o layout que o BLR
23/// descreve: cada campo é alinhado à sua fronteira natural, seguido de um
24/// indicador de nulo `SQL_SHORT` (2 bytes, alinhamento 2). Sem arredondamento
25/// final — confirmado por captura (INTEGER + VARCHAR(20) → 30 bytes).
26pub fn message_buffer_len(columns: &[ColumnMeta]) -> u32 {
27    let mut off: usize = 0;
28    for col in columns {
29        let (len, alignment) = type_size_align(col);
30        off = align_up(off, alignment);
31        off += len;
32        // indicador de nulo: SQL_SHORT, 2 bytes, alinhamento 2.
33        off = align_up(off, 2);
34        off += 2;
35    }
36    off as u32
37}
38
39#[inline]
40fn align_up(n: usize, alignment: usize) -> usize {
41    (n + alignment - 1) & !(alignment - 1)
42}
43
44/// `(comprimento dos dados, alinhamento)` de um campo no buffer de mensagem.
45fn type_size_align(col: &ColumnMeta) -> (usize, usize) {
46    let n = col.length as usize;
47    match sql_type::base(col.sql_type) {
48        sql_type::TEXT => (n, 1),
49        sql_type::VARYING => (n + 2, 2),
50        sql_type::SHORT => (2, 2),
51        sql_type::LONG => (4, 4),
52        sql_type::FLOAT => (4, 4),
53        sql_type::TYPE_DATE | sql_type::TYPE_TIME => (4, 4),
54        sql_type::INT64 => (8, 8),
55        sql_type::DOUBLE | sql_type::D_FLOAT => (8, 8),
56        sql_type::TIMESTAMP => (8, 4),
57        sql_type::BLOB | sql_type::QUAD | sql_type::ARRAY => (8, 4),
58        sql_type::INT128 => (16, 8),
59        sql_type::BOOLEAN => (1, 1),
60        sql_type::DEC16 => (8, 8),
61        sql_type::DEC34 => (16, 8),
62        // Representação em memória (não-`_EX`): ISC_TIME_TZ = 6 B, ISC_TIMESTAMP_TZ = 10 B.
63        sql_type::TIME_TZ | sql_type::TIME_TZ_EX => (6, 4),
64        sql_type::TIMESTAMP_TZ | sql_type::TIMESTAMP_TZ_EX => (10, 4),
65        _ => (8, 8),
66    }
67}
68
69/// Codifica uma linha (mensagem de parâmetros de entrada) no formato de
70/// transmissão que o servidor espera: um bitmap de nulos little-endian inicial
71/// seguido do valor XDR de cada coluna NÃO-NULL, em ordem. O inverso de
72/// [`decode_row`].
73pub fn encode_row(columns: &[ColumnMeta], values: &[Value], charset: Charset) -> Result<Vec<u8>> {
74    let mut out = Vec::new();
75    encode_row_into(&mut out, columns, values, charset)?;
76    Ok(out)
77}
78
79/// Como [`encode_row`], mas **anexa** a mensagem ao fim de `out`, sem alocar um buffer
80/// temporário por linha — útil para acumular muitas linhas (ver [`crate::Batch::add`]).
81///
82/// `out` deve começar numa fronteira de 4 bytes, o que vale ao concatenar mensagens
83/// que já terminam alinhadas a 4. Em caso de erro, `out` é restaurado ao tamanho que
84/// tinha na entrada (a mensagem parcial é descartada), preservando a atomicidade.
85pub fn encode_row_into(
86    out: &mut Vec<u8>,
87    columns: &[ColumnMeta],
88    values: &[Value],
89    charset: Charset,
90) -> Result<()> {
91    if values.len() != columns.len() {
92        return Err(Error::protocol(format!(
93            "parameter count mismatch: statement expects {}, got {}",
94            columns.len(),
95            values.len()
96        )));
97    }
98    let inicio = out.len();
99    out.resize(inicio + null_bitmap_len(columns.len()), 0);
100    for (i, (col, val)) in columns.iter().zip(values).enumerate() {
101        if val.is_null() {
102            out[inicio + i / 8] |= 1 << (i % 8);
103        } else if let Err(e) = encode_value(out, col, val, charset) {
104            out.truncate(inicio); // desfaz a mensagem parcial
105            return Err(e);
106        }
107    }
108    Ok(())
109}
110
111fn put_i32_be(out: &mut Vec<u8>, v: i32) {
112    out.extend_from_slice(&v.to_be_bytes());
113}
114
115fn put_pad(out: &mut Vec<u8>, data_len: usize) {
116    for _ in 0..(align4(data_len) - data_len) {
117        out.push(0);
118    }
119}
120
121fn encode_value(out: &mut Vec<u8>, col: &ColumnMeta, val: &Value, charset: Charset) -> Result<()> {
122    let mismatch = || Error::protocol(format!("value does not fit column type {}", col.sql_type));
123    match sql_type::base(col.sql_type) {
124        sql_type::SHORT => put_i32_be(out, i32::from(val.as_i64().ok_or_else(mismatch)? as i16)),
125        sql_type::LONG => put_i32_be(out, val.as_i64().ok_or_else(mismatch)? as i32),
126        sql_type::INT64 => out.extend_from_slice(&val.as_i64().ok_or_else(mismatch)?.to_be_bytes()),
127        sql_type::INT128 => match val {
128            Value::Int128(v) => out.extend_from_slice(&v.to_be_bytes()),
129            _ => {
130                out.extend_from_slice(&i128::from(val.as_i64().ok_or_else(mismatch)?).to_be_bytes())
131            }
132        },
133        sql_type::FLOAT => match val {
134            Value::Float(f) => out.extend_from_slice(&f.to_bits().to_be_bytes()),
135            Value::Double(f) => out.extend_from_slice(&(*f as f32).to_bits().to_be_bytes()),
136            _ => return Err(mismatch()),
137        },
138        sql_type::DOUBLE | sql_type::D_FLOAT => match val {
139            Value::Double(f) => out.extend_from_slice(&f.to_bits().to_be_bytes()),
140            Value::Float(f) => out.extend_from_slice(&(f64::from(*f)).to_bits().to_be_bytes()),
141            _ => return Err(mismatch()),
142        },
143        sql_type::VARYING => {
144            let bytes = text_bytes(val, charset)?;
145            put_i32_be(out, bytes.len() as i32);
146            out.extend_from_slice(&bytes);
147            put_pad(out, bytes.len());
148        }
149        sql_type::TEXT => {
150            let bytes = text_bytes(val, charset)?;
151            let n = col.length as usize;
152            out.extend_from_slice(&bytes);
153            // Preenche CHAR(n) à direita com espaços até sua largura declarada.
154            for _ in bytes.len()..n {
155                out.push(b' ');
156            }
157            put_pad(out, n.max(bytes.len()));
158        }
159        sql_type::TYPE_DATE => put_i32_be(out, expect_date(val)?),
160        sql_type::TYPE_TIME => put_i32_be(out, expect_time(val)? as i32),
161        sql_type::TIMESTAMP => match val {
162            Value::Timestamp(d, t) => {
163                put_i32_be(out, *d);
164                put_i32_be(out, *t as i32);
165            }
166            _ => return Err(mismatch()),
167        },
168        sql_type::BOOLEAN => {
169            out.push(matches!(val, Value::Bool(true)) as u8);
170            put_pad(out, 1);
171        }
172        sql_type::DEC16 => match val {
173            Value::DecFloat(d) => out.extend_from_slice(&d.to_decimal64().ok_or_else(mismatch)?),
174            _ => return Err(mismatch()),
175        },
176        sql_type::DEC34 => match val {
177            Value::DecFloat(d) => out.extend_from_slice(&d.to_decimal128().ok_or_else(mismatch)?),
178            _ => return Err(mismatch()),
179        },
180        sql_type::BLOB | sql_type::QUAD => match val {
181            Value::Blob(id) => out.extend_from_slice(&id.to_be_bytes()),
182            _ => return Err(mismatch()),
183        },
184        // Coluna ARRAY como parâmetro: passa um id de array existente (quad 8 B).
185        sql_type::ARRAY => match val {
186            Value::Array(id) => out.extend_from_slice(&id.to_be_bytes()),
187            _ => return Err(mismatch()),
188        },
189        // WITH TIME ZONE como parâmetro: o BLR de entrada usa o formato base
190        // (não-`_EX`), então enviamos UTC + zona; o servidor recalcula o offset.
191        sql_type::TIME_TZ | sql_type::TIME_TZ_EX => match val {
192            Value::TimeTz(t) => {
193                put_i32_be(out, t.utc_time as i32);
194                put_i32_be(out, i32::from(t.zone));
195            }
196            _ => return Err(mismatch()),
197        },
198        sql_type::TIMESTAMP_TZ | sql_type::TIMESTAMP_TZ_EX => match val {
199            Value::TimestampTz(t) => {
200                put_i32_be(out, t.utc_date);
201                put_i32_be(out, t.utc_time as i32);
202                put_i32_be(out, i32::from(t.zone));
203            }
204            _ => return Err(mismatch()),
205        },
206        _ => {
207            return Err(Error::protocol(format!(
208                "unsupported parameter type {}",
209                col.sql_type
210            )));
211        }
212    }
213    Ok(())
214}
215
216fn text_bytes(val: &Value, charset: Charset) -> Result<std::borrow::Cow<'_, [u8]>> {
217    use std::borrow::Cow;
218    match val {
219        // Texto é transcodificado para o charset da conexão; bytes/OCTETS vão crus.
220        Value::Text(s) => Ok(Cow::Owned(charset.encode(s))),
221        Value::Bytes(b) => Ok(Cow::Borrowed(b)),
222        _ => Err(Error::protocol("expected a text/bytes value")),
223    }
224}
225
226fn expect_date(val: &Value) -> Result<i32> {
227    match val {
228        Value::Date(d) => Ok(*d),
229        Value::Timestamp(d, _) => Ok(*d),
230        _ => Err(Error::protocol("expected a DATE value")),
231    }
232}
233
234fn expect_time(val: &Value) -> Result<u32> {
235    match val {
236        Value::Time(t) => Ok(*t),
237        Value::Timestamp(_, t) => Ok(*t),
238        _ => Err(Error::protocol("expected a TIME value")),
239    }
240}
241
242/// Decodifica uma linha do stream a partir dos metadados das colunas de saída.
243/// `charset` é o charset da conexão, usado para decodificar CHAR/VARCHAR.
244pub fn decode_row(
245    stream: &mut FbStream,
246    columns: &[ColumnMeta],
247    charset: Charset,
248) -> Result<Vec<Value>> {
249    let bitmap = stream.read_raw(null_bitmap_len(columns.len()))?;
250    let mut values = Vec::with_capacity(columns.len());
251    for (i, col) in columns.iter().enumerate() {
252        let is_null = bitmap[i / 8] & (1 << (i % 8)) != 0;
253        if is_null {
254            values.push(Value::Null);
255        } else {
256            values.push(decode_value(stream, col, charset)?);
257        }
258    }
259    Ok(values)
260}
261
262fn decode_value(stream: &mut FbStream, col: &ColumnMeta, charset: Charset) -> Result<Value> {
263    Ok(match sql_type::base(col.sql_type) {
264        sql_type::SHORT => Value::Short(stream.read_i32()? as i16),
265        sql_type::LONG => Value::Int(stream.read_i32()?),
266        sql_type::INT64 => Value::BigInt(stream.read_i64()?),
267        sql_type::INT128 => {
268            let b = stream.read_raw(16)?;
269            Value::Int128(i128::from_be_bytes(b.try_into().unwrap()))
270        }
271        sql_type::DEC16 => {
272            let b = stream.read_raw(8)?;
273            Value::DecFloat(crate::decfloat::DecFloat::from_decimal64(
274                b.try_into().unwrap(),
275            ))
276        }
277        sql_type::DEC34 => {
278            let b = stream.read_raw(16)?;
279            Value::DecFloat(crate::decfloat::DecFloat::from_decimal128(
280                b.try_into().unwrap(),
281            ))
282        }
283        sql_type::FLOAT => Value::Float(f32::from_bits(stream.read_i32()? as u32)),
284        sql_type::DOUBLE | sql_type::D_FLOAT => Value::Double(stream.read_f64()?),
285        sql_type::TEXT => {
286            let n = col.length as usize;
287            let raw = stream.read_raw(n)?;
288            stream.read_pad(n)?;
289            text_or_bytes(col, raw, charset)
290        }
291        sql_type::VARYING => {
292            let raw = stream.read_bytes()?; // prefixado por comprimento + com padding
293            text_or_bytes(col, raw, charset)
294        }
295        sql_type::TYPE_DATE => Value::Date(stream.read_i32()?),
296        sql_type::TYPE_TIME => Value::Time(stream.read_i32()? as u32),
297        sql_type::TIMESTAMP => {
298            let date = stream.read_i32()?;
299            let time = stream.read_i32()? as u32;
300            Value::Timestamp(date, time)
301        }
302        // Tipos WITH TIME ZONE: pedimos o formato ESTENDIDO (`_EX`) no BLR de
303        // saída, então o servidor envia, além de UTC + zona, o offset resolvido.
304        // Cada componente é um inteiro XDR de 4 bytes (USHORT/SSHORT inclusive).
305        sql_type::TIME_TZ | sql_type::TIME_TZ_EX => {
306            let utc_time = stream.read_i32()? as u32;
307            let zone = stream.read_i32()? as u16;
308            let offset = stream.read_i32()? as i16;
309            Value::TimeTz(crate::value::TimeTz {
310                utc_time,
311                zone,
312                offset,
313            })
314        }
315        sql_type::TIMESTAMP_TZ | sql_type::TIMESTAMP_TZ_EX => {
316            let utc_date = stream.read_i32()?;
317            let utc_time = stream.read_i32()? as u32;
318            let zone = stream.read_i32()? as u16;
319            let offset = stream.read_i32()? as i16;
320            Value::TimestampTz(crate::value::TimestampTz {
321                utc_date,
322                utc_time,
323                zone,
324                offset,
325            })
326        }
327        sql_type::BLOB | sql_type::QUAD => Value::Blob(stream.read_quad()?),
328        // Coluna ARRAY: chega como um id de 8 bytes (quad), igual ao blob; os
329        // elementos são buscados à parte via op_get_slice (ver [`crate::array`]).
330        sql_type::ARRAY => Value::Array(stream.read_quad()?),
331        sql_type::BOOLEAN => {
332            let b = stream.read_raw(1)?;
333            stream.read_pad(1)?;
334            Value::Bool(b[0] != 0)
335        }
336        _ => {
337            // Tipo desconhecido: consome sua largura declarada como bytes opacos.
338            let n = col.xdr_len();
339            Value::Bytes(stream.read_raw(n)?)
340        }
341    })
342}
343
344/// Charset OCTETS (sub_type 1 para texto) permanece binário; todo o resto é
345/// decodificado conforme o charset da conexão (o servidor translitera o texto
346/// para esse charset antes de enviar).
347fn text_or_bytes(col: &ColumnMeta, raw: Vec<u8>, charset: Charset) -> Value {
348    const CS_OCTETS: i32 = 1;
349    if col.sub_type == CS_OCTETS {
350        Value::Bytes(raw)
351    } else {
352        let s = charset.decode(&raw);
353        // Remove o preenchimento (padding) à direita do CHAR; VARCHAR já carrega
354        // seus bytes exatos.
355        if sql_type::base(col.sql_type) == sql_type::TEXT {
356            Value::Text(s.trim_end_matches(' ').to_string())
357        } else {
358            Value::Text(s)
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    fn col(sql_type: i32, length: i32) -> ColumnMeta {
368        ColumnMeta {
369            sql_type,
370            length,
371            ..Default::default()
372        }
373    }
374
375    #[test]
376    fn buffer_len_integer_and_varchar() {
377        // INTEGER + VARCHAR(20): confirmado por captura do cliente C (IBatch) = 30.
378        // int(4)@0 + null(2)@4 → 6; varchar(22)@6 + null(2)@28 → 30.
379        let cols = [col(sql_type::LONG, 4), col(sql_type::VARYING, 20)];
380        assert_eq!(message_buffer_len(&cols), 30);
381    }
382
383    #[test]
384    fn buffer_len_respects_alignment() {
385        // SMALLINT(2)@0 + null(2)@2 → 4; BIGINT alinha a 8 → 8, +8 → 16,
386        // null(2)@16 → 18.
387        let cols = [col(sql_type::SHORT, 2), col(sql_type::INT64, 8)];
388        assert_eq!(message_buffer_len(&cols), 18);
389    }
390
391    #[test]
392    fn encode_row_is_4_aligned() {
393        // Cada mensagem codificada deve terminar em fronteira de 4 bytes para que
394        // a concatenação no op_batch_msg permaneça alinhada.
395        let cols = [col(sql_type::LONG, 4), col(sql_type::VARYING, 20)];
396        let msg = encode_row(
397            &cols,
398            &[Value::Int(1), Value::Text("um".into())],
399            Charset::Utf8,
400        )
401        .unwrap();
402        assert_eq!(msg.len() % 4, 0);
403        assert_eq!(msg.len(), 16); // bitmap(4) + int(4) + len(4)+"um"+pad(2)=8
404    }
405}