Skip to main content

firebird_wire/
batch.rs

1//! DML em lote (batch) — o recurso "principal" de array DML do Firebird 4+.
2//!
3//! Um [`Batch`] insere/atualiza/exclui muitas linhas com uma única instrução
4//! preparada, acumulando mensagens no cliente e enviando-as ao servidor em uma
5//! ida só. É muito mais rápido que executar a instrução linha a linha.
6//!
7//! ```text
8//! let mut batch = conn.create_batch(&tx, "INSERT INTO t (a, b) VALUES (?, ?)")?;
9//! batch.add(&[Value::Int(1), Value::Text("um".into())])?;
10//! batch.add(&[Value::Int(2), Value::Text("dois".into())])?;
11//! let result = batch.execute(&mut conn, &tx)?;   // envia + executa
12//! println!("{} linhas afetadas no total", result.total_affected());
13//! batch.close(&mut conn)?;                        // libera no servidor
14//! ```
15//!
16//! # Protocolo (FB4+, op codes 99–103)
17//!
18//! Descoberto por captura de um cliente C usando a interface OO `IBatch`:
19//! 1. `op_batch_create` (99): `stmt | blr(cstring) | msglen(u32) | pb(cstring)`.
20//!    O BLR descreve o formato da mensagem (igual ao `in_blr` de `op_execute`);
21//!    `msglen` é o tamanho do buffer de mensagem do lado do cliente.
22//! 2. `op_batch_msg` (100): `stmt | count(u32) | mensagens`, cada mensagem no
23//!    mesmo formato compacto (bitmap de nulos + valores XDR) usado em `op_execute`.
24//! 3. `op_batch_exec` (101): `stmt | transaction`. Responde com `op_batch_cs`.
25//! 4. `op_batch_cs` (103): estado de conclusão (contagens por linha + erros).
26//! 5. `op_batch_rls` (102): libera o batch no servidor.
27//!
28//! O cliente C agrupa create+msg e usa `op_batch_sync` (110) para drenar as
29//! respostas adiadas; nós, sendo síncronos, lemos a resposta de cada op na hora.
30//!
31//! # BLOBs em lote (política STREAM, `op_batch_blob_stream` 105)
32//!
33//! Quando a instrução tem coluna BLOB, [`Connection::create_batch`] ativa a
34//! política `BLOB_STREAM` no PB (`TAG_BLOB_POLICY` = 3). O chamador então usa
35//! [`Batch::add_blob`] para registrar cada blob e recebe um id (quad) para pôr
36//! no campo da linha. Em [`Batch::execute`], antes das mensagens, enviamos:
37//!
38//! `op_batch_blob_stream` (105): `stmt | length(u32) | stream`. O `stream` é a
39//! concatenação CRUA (sem padding) dos blobs, cada um `id(quad 8B BE) | size(4B
40//! BE) | bpb_size(4B BE) | bpb | dados`. O `length` NÃO é o número de bytes no
41//! wire: é o tamanho do buffer que o servidor aloca — a soma de `align4(16 +
42//! bpb + dados)` de cada blob — e deve ser múltiplo de 4 (o servidor rejeita
43//! caso contrário). Ao percorrer o stream (ver `xdr_blob_stream` no servidor),
44//! ele avança o ponteiro do buffer com alinhamento de 4 SEM consumir bytes do
45//! wire para o padding; o laço termina quando o que resta é menor que um
46//! cabeçalho (16 B) ou chega a zero. Por isso o wire carrega menos bytes que
47//! `length`, e a próxima op pode começar em offset não múltiplo de 4.
48//! Confirmado por captura do `11.batch.cpp` (conteúdo 30 B → length 32; 33 B →
49//! 36) e pelo código do servidor. Os blobs vão **antes** das mensagens porque a
50//! linha referencia o id já registrado.
51//!
52//! `op_batch_regblob` (104): `stmt | id_existente(quad 8B BE) | id_batch(quad)`.
53//! Mapeia um BLOB já gravado fora do batch ([`Connection::write_blob`]) a um id
54//! local, evitando reenviar os dados. Ver [`Batch::register_blob`].
55//!
56//! `op_batch_set_bpb` (106): `stmt | bpb(cstring)`. Define o BPB padrão dos
57//! blobs do batch; o servidor lê o `isc_bpb_type` para marcar os blobs como
58//! segmentados ou stream. Ver [`Batch::set_default_bpb`] / [`Batch::set_segmented`].
59//! Em modo segmentado, cada segmento no stream é um `u32` big-endian com o
60//! comprimento seguido dos bytes, SEM padding; já o campo `size` do blob segue a
61//! contabilidade do buffer do servidor (cabeçalho de 2 bytes alinhado a 2), ou
62//! seja `align2(2 + len)`. Capturado da Parte 3 do `11.batch.cpp`.
63
64use crate::blr::message_blr;
65use crate::connection::Connection;
66use crate::error::{DatabaseError, Error, Result, StatusVector};
67use crate::message::{encode_row_into, message_buffer_len};
68use crate::transaction::Transaction;
69use crate::value::{ColumnMeta, Value};
70use crate::wire::consts::*;
71use crate::wire::response::{read_op, read_response, read_response_body, read_status_vector};
72use crate::wire::stream::{op_name, op_packet};
73use crate::wire::xdr::ParameterBuffer;
74
75/// Um lote de mensagens vinculado a uma instrução preparada no servidor.
76///
77/// Crie-o com [`Connection::create_batch`], acumule linhas com [`Batch::add`] e
78/// envie com [`Batch::execute`]. O batch pode ser reutilizado (adicione mais e
79/// execute de novo). Ao terminar, chame [`Batch::close`] para liberá-lo.
80pub struct Batch {
81    handle: i32,
82    params: Vec<ColumnMeta>,
83    /// Mensagens já codificadas mas ainda não enviadas/executadas.
84    pending: Vec<u8>,
85    pending_count: u32,
86    /// Stream de BLOBs acumulado (cabeçalho + dados de cada blob), ainda não
87    /// enviado. Só usado em batches com coluna BLOB (política STREAM). Os blobs
88    /// vão concatenados SEM padding no wire (o servidor reconstrói o alinhamento
89    /// no buffer dele).
90    pending_blobs: Vec<u8>,
91    /// Comprimento *alinhado* do stream de blobs (campo `p_batch_blob_data` do
92    /// op_batch_blob_stream): a soma de `align4(16 + dados)` de cada blob. É o
93    /// tamanho do buffer que o servidor aloca, e deve ser múltiplo de
94    /// `BLOB_ALIGN` (o servidor rejeita caso contrário). Sempre `>=` o número de
95    /// bytes efetivamente enviados.
96    blob_stream_len: usize,
97    /// Registros pendentes de BLOBs pré-existentes (`op_batch_regblob`): cada par
98    /// é `(id_existente, id_no_batch)`. Enviados em [`Batch::execute`] antes das
99    /// mensagens, junto com o stream de blobs.
100    pending_regblobs: Vec<(u64, u64)>,
101    /// Próximo id (quad) a atribuir em [`Batch::add_blob`]/[`Batch::register_blob`].
102    /// Começa em 1; o id 0 é reservado (quad nulo).
103    next_blob_id: u64,
104    /// Verdadeiro se a instrução tem ao menos uma coluna BLOB (política STREAM
105    /// ativada na criação).
106    blob_stream: bool,
107    /// Charset da conexão, usado para codificar parâmetros de texto em
108    /// [`Batch::add`] (capturado na criação, já que `add` não recebe a conexão).
109    charset: crate::charset::Charset,
110    /// BPB padrão a aplicar aos blobs do batch (`op_batch_set_bpb`), se definido.
111    /// Enviado em [`Batch::execute`] antes do stream de blobs.
112    default_bpb: Option<Vec<u8>>,
113    /// Verdadeiro quando o BPB padrão marca os blobs como segmentados; nesse caso
114    /// [`Batch::add_blob`] enquadra os dados em segmentos.
115    blob_segmented: bool,
116    closed: bool,
117}
118
119/// Alinhamento do cabeçalho de segmento em um blob segmentado
120/// (`IBatch::BLOB_SEGHDR_ALIGN`). Como cada `add_blob` segmentado começa numa
121/// fronteira de 4 bytes (já múltiplo de 2), não há padding a inserir.
122const BLOB_SEGHDR_ALIGN: usize = 2;
123const _: () = assert!(BLOB_ALIGN.is_multiple_of(BLOB_SEGHDR_ALIGN));
124
125/// Arredonda `n` para cima até um múltiplo de `align` (potência de 2).
126#[inline]
127fn align_up(n: usize, align: usize) -> usize {
128    (n + align - 1) & !(align - 1)
129}
130
131/// Verdadeiro se o BPB marca o blob como segmentado: clumplet `isc_bpb_type`(3)
132/// com valor `isc_bpb_type_segmented`(0). Formato do BPB: byte de versão seguido
133/// de clumplets `tag(1) | len(1) | valor(len)`.
134fn bpb_is_segmented(bpb: &[u8]) -> bool {
135    let mut i = 1; // pula o byte de versão
136    while i + 1 < bpb.len() {
137        let tag = bpb[i];
138        let len = bpb[i + 1] as usize;
139        let val = &bpb[i + 2..(i + 2 + len).min(bpb.len())];
140        if tag == bpb::TYPE {
141            return val.first() == Some(&bpb::TYPE_SEGMENTED);
142        }
143        i += 2 + len;
144    }
145    false
146}
147
148/// Alinhamento (em bytes) de cada blob dentro do stream de `op_batch_blob_stream`.
149/// O servidor FB5 usa 4 (confirmado por captura: conteúdo de 30 B → campo de
150/// comprimento 32; de 33 B → 36). É o valor que `IBatch::getBlobAlignment`
151/// retorna para o protocolo remoto.
152const BLOB_ALIGN: usize = 4;
153
154impl Batch {
155    /// O handle da instrução/batch no servidor.
156    pub fn handle(&self) -> i32 {
157        self.handle
158    }
159
160    /// Quantas mensagens estão acumuladas mas ainda não executadas.
161    pub fn pending(&self) -> u32 {
162        self.pending_count
163    }
164
165    /// Tamanho (alinhado) do stream de BLOBs acumulado mas ainda não enviado, em
166    /// bytes. Zera após [`Batch::execute`]/[`Batch::cancel`]. Útil para o chamador
167    /// drenar o lote por *volume de blob* (e não só por contagem de linhas) e assim
168    /// não estourar o buffer de batch do servidor (`TAG_BUFFER_BYTES_SIZE`).
169    pub fn blob_stream_len(&self) -> usize {
170        self.blob_stream_len
171    }
172
173    /// Os metadados dos parâmetros de entrada (a forma de cada linha esperada).
174    pub fn params(&self) -> &[ColumnMeta] {
175        &self.params
176    }
177
178    /// Registra um BLOB no stream do lote e devolve seu id (quad), que deve ser
179    /// posto no campo BLOB da linha correspondente como [`Value::Blob`].
180    ///
181    /// Só funciona em batches cuja instrução tem coluna BLOB (a política STREAM é
182    /// ativada automaticamente em [`Connection::create_batch`] nesse caso). Como
183    /// [`Self::add`], apenas acumula na memória; os blobs vão à rede em
184    /// [`Self::execute`], **antes** das mensagens que os referenciam.
185    ///
186    /// ```text
187    /// let id = batch.add_blob(b"conteudo do blob")?;
188    /// batch.add(&[Value::Text("BAT31".into()), Value::Blob(id)])?;
189    /// ```
190    pub fn add_blob(&mut self, data: &[u8]) -> Result<u64> {
191        if !self.blob_stream {
192            return Err(Error::protocol(
193                "add_blob exige uma coluna BLOB na instrução do batch",
194            ));
195        }
196        let id = self.next_blob_id;
197        self.next_blob_id += 1;
198        // Cabeçalho do blob, tudo big-endian: id(quad 8B) | size(4B) | bpb_size(4B
199        // = 0, pois o BPB é do batch). Os blobs vão concatenados SEM padding; o
200        // servidor reconstrói o alinhamento no buffer dele.
201        self.pending_blobs.extend_from_slice(&id.to_be_bytes());
202        if self.blob_segmented {
203            // Blob segmentado (um único segmento aqui). NO WIRE: cada segmento é
204            // `u32` big-endian com o comprimento + os bytes, sem padding. Mas o
205            // campo `size` (e o comprimento do stream) seguem a contabilidade do
206            // *buffer* do servidor, que usa cabeçalho de 2 bytes alinhado a
207            // `BLOB_SEGHDR_ALIGN`: `size = align2(2 + len)`. Ver `xdr_blob_stream`.
208            let size_field = align_up(2 + data.len(), BLOB_SEGHDR_ALIGN);
209            self.pending_blobs
210                .extend_from_slice(&(size_field as u32).to_be_bytes());
211            self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
212            self.pending_blobs
213                .extend_from_slice(&(data.len() as u32).to_be_bytes());
214            self.pending_blobs.extend_from_slice(data);
215            self.blob_stream_len += align_up(16 + size_field, BLOB_ALIGN);
216        } else {
217            // Blob de stream (não segmentado): dados crus.
218            self.pending_blobs
219                .extend_from_slice(&(data.len() as u32).to_be_bytes());
220            self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
221            self.pending_blobs.extend_from_slice(data);
222            self.blob_stream_len += align_up(16 + data.len(), BLOB_ALIGN);
223        }
224        Ok(id)
225    }
226
227    /// Define o BPB (blob parameter buffer) padrão dos blobs do batch
228    /// (`op_batch_set_bpb`). O uso típico é marcar os blobs como **segmentados**
229    /// (`isc_bpb_type = isc_bpb_type_segmented`): nesse caso [`Self::add_blob`]
230    /// passa a enquadrar cada blob como um segmento. Deve ser chamado antes de
231    /// [`Self::add_blob`]; o BPB vai à rede em [`Self::execute`], antes dos blobs.
232    ///
233    /// Conveniência: [`Self::set_segmented`] monta o BPB segmentado para você.
234    pub fn set_default_bpb(&mut self, bpb: Vec<u8>) -> Result<()> {
235        if !self.blob_stream {
236            return Err(Error::protocol(
237                "set_default_bpb exige uma coluna BLOB na instrução do batch",
238            ));
239        }
240        // Detecta o tipo segmentado: clumplet `isc_bpb_type`(3) com valor
241        // `isc_bpb_type_segmented`(0). BPB usa comprimento de 1 byte por clumplet.
242        self.blob_segmented = bpb_is_segmented(&bpb);
243        self.default_bpb = Some(bpb);
244        Ok(())
245    }
246
247    /// Atalho para [`Self::set_default_bpb`] com um BPB que marca os blobs como
248    /// segmentados (`segmentado = true`) ou stream (`false`, o padrão).
249    pub fn set_segmented(&mut self, segmented: bool) -> Result<()> {
250        let kind = if segmented {
251            bpb::TYPE_SEGMENTED
252        } else {
253            bpb::TYPE_STREAM
254        };
255        // BPB no formato que o fbclient envia: versão(1) | tag isc_bpb_type(3) |
256        // len(1) = 4 | valor(4 bytes LE). Capturado do `11.batch.cpp` Parte 3.
257        self.set_default_bpb(vec![BPB_VERSION1, bpb::TYPE, 4, kind, 0, 0, 0])
258    }
259
260    /// Mapeia um BLOB **já existente** (criado fora do batch, p.ex. via
261    /// [`Connection::write_blob`]/[`Connection::create_blob`]) para um id local
262    /// do batch e devolve esse id, que deve ir no campo BLOB da linha como
263    /// [`Value::Blob`]. Útil para reaproveitar BLOBs grandes já gravados sem
264    /// reenviá-los pelo stream (`op_batch_regblob`).
265    ///
266    /// Como [`Self::add_blob`], só vale em batches com coluna BLOB; o registro é
267    /// acumulado e enviado em [`Self::execute`], antes das mensagens.
268    pub fn register_blob(&mut self, existing_id: u64) -> Result<u64> {
269        if !self.blob_stream {
270            return Err(Error::protocol(
271                "register_blob exige uma coluna BLOB na instrução do batch",
272            ));
273        }
274        let batch_id = self.next_blob_id;
275        self.next_blob_id += 1;
276        self.pending_regblobs.push((existing_id, batch_id));
277        Ok(batch_id)
278    }
279
280    /// Adiciona uma linha ao lote. Os valores devem corresponder, em número e
281    /// tipo, aos parâmetros da instrução (ver [`Self::params`]). Apenas acumula
282    /// na memória; nada vai à rede até [`Self::execute`].
283    pub fn add(&mut self, values: &[Value]) -> Result<()> {
284        // Codifica direto no buffer pendente (sem Vec temporário por linha). Em erro,
285        // `encode_row_into` restaura o tamanho de `pending`, então não corrompe o lote.
286        encode_row_into(&mut self.pending, &self.params, values, self.charset)?;
287        self.pending_count += 1;
288        Ok(())
289    }
290
291    /// Envia as mensagens acumuladas e executa o lote, retornando o estado de
292    /// conclusão (contagens por linha e erros por linha). Esvazia o buffer
293    /// pendente; o batch pode então ser reutilizado.
294    pub fn execute(&mut self, conn: &mut Connection, tx: &Transaction) -> Result<BatchResult> {
295        if self.closed {
296            return Err(Error::protocol("batch já foi fechado"));
297        }
298        // 0a. Define o BPB padrão (op_batch_set_bpb), se houver, antes dos blobs.
299        if let Some(bpb) = self.default_bpb.take() {
300            let mut w = op_packet(op::BATCH_SET_BPB);
301            w.put_i32(self.handle);
302            w.put_bytes(&bpb); // cstring: len(4) + bpb + pad
303            conn.io().send(&w)?;
304            read_response(conn.io())?;
305        }
306        // 0b. Envia os BLOBs pendentes (op_batch_blob_stream) ANTES das mensagens,
307        //    pois cada mensagem referencia um id de blob já registrado.
308        if !self.pending_blobs.is_empty() {
309            let mut w = op_packet(op::BATCH_BLOB_STREAM);
310            w.put_i32(self.handle);
311            // Comprimento alinhado (≥ bytes no wire), depois os blobs crus. Não
312            // alinhamos o pacote: o servidor recompõe o alinhamento internamente
313            // e a próxima op pode começar em offset não múltiplo de 4.
314            w.put_i32(self.blob_stream_len as i32);
315            w.put_raw(&self.pending_blobs);
316            conn.io().send(&w)?;
317            read_response(conn.io())?;
318            self.pending_blobs.clear();
319            self.blob_stream_len = 0;
320        }
321        // 0b. Registra BLOBs pré-existentes (op_batch_regblob), também antes das
322        //     mensagens. Layout: stmt | id_existente(quad 8B BE) | id_batch(quad).
323        if !self.pending_regblobs.is_empty() {
324            for (existing_id, batch_id) in std::mem::take(&mut self.pending_regblobs) {
325                let mut w = op_packet(op::BATCH_REGBLOB);
326                w.put_i32(self.handle);
327                w.put_raw(&existing_id.to_be_bytes());
328                w.put_raw(&batch_id.to_be_bytes());
329                conn.io().send(&w)?;
330                read_response(conn.io())?;
331            }
332        }
333        // 1. Envia as mensagens pendentes (op_batch_msg), se houver.
334        if self.pending_count > 0 {
335            let mut w = op_packet(op::BATCH_MSG);
336            w.put_i32(self.handle);
337            w.put_i32(self.pending_count as i32);
338            w.put_raw(&self.pending);
339            w.align();
340            conn.io().send(&w)?;
341            read_response(conn.io())?;
342            self.pending.clear();
343            self.pending_count = 0;
344        }
345
346        // 2. Executa o lote (op_batch_exec) e lê o estado de conclusão.
347        let mut w = op_packet(op::BATCH_EXEC);
348        w.put_i32(self.handle);
349        w.put_i32(tx.handle());
350        conn.io().send(&w)?;
351        read_batch_cs(conn)
352    }
353
354    /// Descarta as mensagens acumuladas que ainda não foram executadas
355    /// (`op_batch_cancel`). Não afeta linhas já executadas em chamadas anteriores.
356    pub fn cancel(&mut self, conn: &mut Connection) -> Result<()> {
357        self.pending.clear();
358        self.pending_count = 0;
359        self.pending_blobs.clear();
360        self.blob_stream_len = 0;
361        self.pending_regblobs.clear();
362        let mut w = op_packet(op::BATCH_CANCEL);
363        w.put_i32(self.handle);
364        conn.io().send(&w)?;
365        read_response(conn.io())?;
366        Ok(())
367    }
368
369    /// Libera o batch e a instrução preparada no servidor (`op_batch_rls` +
370    /// `op_free_statement` com `DSQL_drop`), consumindo o handle.
371    pub fn close(mut self, conn: &mut Connection) -> Result<()> {
372        self.closed = true;
373        let mut w = op_packet(op::BATCH_RLS);
374        w.put_i32(self.handle);
375        conn.io().send(&w)?;
376        read_response(conn.io())?;
377
378        let mut w = op_packet(op::FREE_STATEMENT);
379        w.put_i32(self.handle);
380        w.put_i32(free::DROP);
381        conn.io().send(&w)?;
382        read_response(conn.io())?;
383        Ok(())
384    }
385}
386
387impl Drop for Batch {
388    fn drop(&mut self) {
389        if !self.closed {
390            crate::warn_unclosed("Batch", self.handle);
391        }
392    }
393}
394
395/// Opções de criação de um [`Batch`] (clumplets do `op_batch_create`).
396#[derive(Debug, Clone, Copy)]
397pub struct BatchOptions {
398    /// Se `true`, o servidor CONTINUA após uma linha falhar, executando as demais e
399    /// reportando o erro de cada uma em [`BatchResult::errors`]. Para isso ele
400    /// bracketa cada linha num savepoint interno — o que tem **custo por linha**.
401    ///
402    /// Se `false` (**padrão**), o lote PARA na primeira linha que falha (fail-fast):
403    /// é bem mais rápido (sem savepoint por linha) e é o que se quer quando a
404    /// transação é "tudo ou nada" e qualquer erro aborta a operação inteira.
405    pub multierror: bool,
406    /// Se `true` (**padrão**), o servidor reporta a contagem de linhas afetadas por
407    /// mensagem em [`BatchResult::update_counts`] (e portanto [`BatchResult::total_affected`]).
408    pub record_counts: bool,
409    /// Tamanho máximo (em bytes) do buffer de batch que o servidor aloca
410    /// (`TAG_BUFFER_BYTES_SIZE`). `None` (**padrão**) deixa o servidor usar o seu
411    /// default. Aumentar permite acumular mais mensagens/blobs por `execute` sem
412    /// estourar o buffer (o estouro aparece como "invalid BLOB ID"); o servidor
413    /// impõe um teto próprio (256 MiB no Firebird).
414    pub buffer_bytes: Option<u32>,
415}
416
417impl Default for BatchOptions {
418    fn default() -> Self {
419        // Fail-fast por padrão: a maioria dos usos de DML em lote quer abortar no
420        // primeiro erro, e o multierror (savepoint por linha) só compensa quando o
421        // chamador realmente vai inspecionar o erro de cada linha.
422        BatchOptions {
423            multierror: false,
424            record_counts: true,
425            buffer_bytes: None,
426        }
427    }
428}
429
430impl BatchOptions {
431    /// Opções padrão (fail-fast, com contagens por linha).
432    pub fn new() -> Self {
433        Self::default()
434    }
435
436    /// Liga/desliga o modo multierro (continuar após erros por linha).
437    pub fn multierror(mut self, on: bool) -> Self {
438        self.multierror = on;
439        self
440    }
441
442    /// Liga/desliga o reporte de contagens de linhas afetadas por mensagem.
443    pub fn record_counts(mut self, on: bool) -> Self {
444        self.record_counts = on;
445        self
446    }
447
448    /// Define o tamanho do buffer de batch do servidor (`TAG_BUFFER_BYTES_SIZE`),
449    /// em bytes. Use para acomodar lotes maiores (mais mensagens/blobs por
450    /// `execute`) sem estourar o buffer. O servidor impõe um teto (256 MiB no
451    /// Firebird); valores acima são limitados por ele.
452    pub fn buffer_bytes(mut self, bytes: u32) -> Self {
453        self.buffer_bytes = Some(bytes);
454        self
455    }
456}
457
458impl Connection {
459    /// Prepara uma instrução e cria um lote (batch) sobre ela com as opções padrão
460    /// ([`BatchOptions::default`]: fail-fast, com contagens por linha). A instrução
461    /// deve ter parâmetros (`?`) — cada [`Batch::add`] fornece uma linha de valores.
462    ///
463    /// Para continuar após erros por linha e coletá-los todos, use
464    /// [`Self::create_batch_with`] com `BatchOptions::new().multierror(true)`.
465    pub fn create_batch(&mut self, tx: &Transaction, sql: &str) -> Result<Batch> {
466        self.create_batch_with(tx, sql, BatchOptions::default())
467    }
468
469    /// Como [`Self::create_batch`], mas com [`BatchOptions`] explícitas.
470    pub fn create_batch_with(
471        &mut self,
472        tx: &Transaction,
473        sql: &str,
474        opts: BatchOptions,
475    ) -> Result<Batch> {
476        let mut stmt = self.prepare(tx, sql)?;
477        let handle = stmt.handle();
478        let params: Vec<ColumnMeta> = stmt.params().to_vec();
479        // O handle passa a viver no Batch (liberado por Batch::close), então
480        // marcamos como transferido para não disparar o aviso de Drop e soltamos
481        // o wrapper aqui (libera só memória).
482        stmt.forget_handle();
483        drop(stmt);
484
485        let blr = message_blr(&params);
486        let msglen = message_buffer_len(&params);
487
488        // Se houver coluna BLOB, ativa a política STREAM: os blobs são enviados
489        // em `op_batch_blob_stream` e a linha referencia o id (ver `add_blob`).
490        let blob_stream = params
491            .iter()
492            .any(|c| sql_type::base(c.sql_type) == sql_type::BLOB);
493
494        // Buffer de parâmetros do batch: byte de versão (1) seguido de clumplets
495        // com comprimento LE de 4 bytes. Os tags são opcionais (ver `BatchOptions`).
496        let mut pb = ParameterBuffer::new(1);
497        if opts.record_counts {
498            pb.bytes_be_len4(batch_tag::RECORD_COUNTS, &1u32.to_le_bytes());
499        }
500        if opts.multierror {
501            pb.bytes_be_len4(batch_tag::MULTIERROR, &1u32.to_le_bytes());
502        }
503        if let Some(bytes) = opts.buffer_bytes {
504            pb.bytes_be_len4(batch_tag::BUFFER_BYTES_SIZE, &bytes.to_le_bytes());
505        }
506        if blob_stream {
507            pb.bytes_be_len4(
508                batch_tag::BLOB_POLICY,
509                &(blob_policy::STREAM as u32).to_le_bytes(),
510            );
511        }
512
513        let mut w = op_packet(op::BATCH_CREATE);
514        w.put_i32(handle);
515        w.put_bytes(&blr); // cstring: len(4) + blr + pad
516        w.put_i32(msglen as i32);
517        w.put_bytes(pb.as_slice()); // cstring: len(4) + pb + pad
518        self.io().send(&w)?;
519        read_response(self.io())?;
520
521        Ok(Batch {
522            handle,
523            params,
524            pending: Vec::new(),
525            pending_count: 0,
526            pending_blobs: Vec::new(),
527            blob_stream_len: 0,
528            pending_regblobs: Vec::new(),
529            next_blob_id: 1,
530            blob_stream,
531            charset: self.charset(),
532            default_bpb: None,
533            blob_segmented: false,
534            closed: false,
535        })
536    }
537}
538
539/// Resultado da execução de um lote: o estado de conclusão por mensagem.
540#[derive(Debug, Clone, Default)]
541pub struct BatchResult {
542    /// Total de mensagens processadas nesta execução.
543    pub total: u32,
544    /// Contagem de linhas afetadas por mensagem, na ordem em que foram
545    /// adicionadas. `>= 0` é o número de linhas; [`batch_cs::EXECUTE_FAILED`]
546    /// (−1) marca uma mensagem que falhou; [`batch_cs::SUCCESS_NO_INFO`] (−2)
547    /// indica sucesso sem contagem reportada.
548    pub update_counts: Vec<i32>,
549    /// Erros detalhados por mensagem (índice da mensagem + erro do servidor).
550    pub errors: Vec<BatchError>,
551}
552
553impl BatchResult {
554    /// Verdadeiro se nenhuma mensagem falhou.
555    pub fn all_succeeded(&self) -> bool {
556        self.errors.is_empty() && !self.update_counts.contains(&batch_cs::EXECUTE_FAILED)
557    }
558
559    /// Soma das linhas afetadas pelas mensagens bem-sucedidas (ignora as que
560    /// falharam ou não reportaram contagem).
561    pub fn total_affected(&self) -> u64 {
562        self.update_counts
563            .iter()
564            .filter(|&&c| c >= 0)
565            .map(|&c| c as u64)
566            .sum()
567    }
568}
569
570/// Um erro detalhado de uma mensagem específica do lote.
571#[derive(Debug, Clone)]
572pub struct BatchError {
573    /// Índice (base zero) da mensagem que falhou, na ordem de adição.
574    pub message_index: u32,
575    /// O erro reportado pelo servidor para essa mensagem.
576    pub error: DatabaseError,
577}
578
579/// Lê a resposta `op_batch_cs` de um `op_batch_exec`.
580///
581/// Layout (confirmado por captura, inclusive com erros forçados):
582/// `op | stmt | reccount | updates | vectors | errors |`
583/// `updates×i32 (contagens) | vectors×(pos u32 + status vector) | errors×u32`.
584fn read_batch_cs(conn: &mut Connection) -> Result<BatchResult> {
585    let code = read_op(conn.io())?;
586    if code == op::RESPONSE {
587        // Falha global (não por linha) veio como op_response.
588        read_response_body(conn.io())?.into_result()?;
589        return Err(Error::protocol(
590            "op_batch_exec retornou op_response sem erro",
591        ));
592    }
593    if code != op::BATCH_CS {
594        return Err(Error::protocol(format!(
595            "esperava op_batch_cs, veio {} ({code})",
596            op_name(code)
597        )));
598    }
599
600    let _stmt = conn.io().read_i32()?;
601    let reccount = conn.io().read_i32()? as u32;
602    let updates = conn.io().read_i32()? as u32;
603    let vectors = conn.io().read_i32()? as u32;
604    let errors = conn.io().read_i32()? as u32;
605
606    let mut update_counts = Vec::with_capacity(updates as usize);
607    for _ in 0..updates {
608        update_counts.push(conn.io().read_i32()?);
609    }
610
611    let mut batch_errors = Vec::with_capacity(vectors as usize);
612    for _ in 0..vectors {
613        let pos = conn.io().read_i32()? as u32;
614        let status = read_status_vector(conn.io())?;
615        batch_errors.push(BatchError {
616            message_index: pos,
617            error: DatabaseError::new(status),
618        });
619    }
620    // Lista simples de posições com erro (quando os detalhes não são pedidos).
621    for _ in 0..errors {
622        let pos = conn.io().read_i32()? as u32;
623        if !batch_errors.iter().any(|e| e.message_index == pos) {
624            let empty = StatusVector {
625                args: Vec::new(),
626                sql_state: None,
627            };
628            batch_errors.push(BatchError {
629                message_index: pos,
630                error: DatabaseError::new(empty),
631            });
632        }
633    }
634
635    Ok(BatchResult {
636        total: reccount,
637        update_counts,
638        errors: batch_errors,
639    })
640}
641
642#[cfg(test)]
643mod tests {
644    use super::*;
645
646    /// Cria um `Batch` "de mentira" (sem conexão/servidor) só para exercitar a
647    /// contabilidade EM MEMÓRIA de `add_blob`. `closed: true` evita o aviso de Drop.
648    fn fake_blob_batch() -> Batch {
649        Batch {
650            handle: 0,
651            params: Vec::new(),
652            pending: Vec::new(),
653            pending_count: 0,
654            pending_blobs: Vec::new(),
655            blob_stream_len: 0,
656            pending_regblobs: Vec::new(),
657            next_blob_id: 1,
658            blob_stream: true,
659            charset: crate::charset::Charset::Utf8,
660            default_bpb: None,
661            blob_segmented: false,
662            closed: true,
663        }
664    }
665
666    /// Bytes que cada blob (não-segmentado) ocupa no buffer do servidor:
667    /// `align4(16 + dados)` (cabeçalho de 16 B + dados, alinhado a `BLOB_ALIGN`).
668    fn entry_len(data_len: usize) -> usize {
669        align_up(16 + data_len, BLOB_ALIGN)
670    }
671
672    #[test]
673    fn add_blob_atribui_ids_sequenciais_a_partir_de_1() {
674        let mut b = fake_blob_batch();
675        assert_eq!(b.add_blob(b"a").unwrap(), 1);
676        assert_eq!(b.add_blob(b"bb").unwrap(), 2);
677        assert_eq!(b.add_blob(b"ccc").unwrap(), 3);
678    }
679
680    #[test]
681    fn add_blob_exige_coluna_blob() {
682        let mut b = fake_blob_batch();
683        b.blob_stream = false;
684        assert!(b.add_blob(b"x").is_err());
685    }
686
687    /// O invariante do qual o flush por bytes depende: `blob_stream_len()` é a soma
688    /// de `align4(16 + dados)` de cada blob — inclusive (e principalmente) para
689    /// tamanhos NÃO múltiplos de 4, que arredondam pra cima.
690    #[test]
691    fn blob_stream_len_acumula_tamanhos_alinhados() {
692        let mut b = fake_blob_batch();
693        for data in [&b"x"[..], &b"yy"[..], &b"zzz"[..], &b"wwww"[..]] {
694            b.add_blob(data).unwrap();
695        }
696        let esperado = entry_len(1) + entry_len(2) + entry_len(3) + entry_len(4);
697        assert_eq!(b.blob_stream_len(), esperado);
698        // A soma é sempre múltipla de BLOB_ALIGN (cada parcela é).
699        assert_eq!(b.blob_stream_len() % BLOB_ALIGN, 0);
700    }
701
702    /// Casos pontuais do tamanho declarado para um único blob (cabeçalho 16 B +
703    /// dados, alinhado a 4): 30 -> align4(46)=48; 33 -> align4(49)=52.
704    #[test]
705    fn blob_stream_len_de_um_unico_blob() {
706        let mut b = fake_blob_batch();
707        b.add_blob(&[0u8; 30]).unwrap();
708        assert_eq!(b.blob_stream_len(), 48);
709
710        let mut b = fake_blob_batch();
711        b.add_blob(&[0u8; 33]).unwrap();
712        assert_eq!(b.blob_stream_len(), 52);
713    }
714
715    /// `BatchOptions` por padrão não fixa o buffer (deixa o default do servidor); o
716    /// builder `buffer_bytes` registra o limite a enviar como `TAG_BUFFER_BYTES_SIZE`.
717    #[test]
718    fn batch_options_buffer_bytes() {
719        assert_eq!(BatchOptions::new().buffer_bytes, None);
720        let opts = BatchOptions::new().buffer_bytes(16 * 1024 * 1024);
721        assert_eq!(opts.buffer_bytes, Some(16 * 1024 * 1024));
722    }
723}