Skip to main content

firebird_wire/
batch.rs

1//! DML em lote (batch) — o recurso "principal" de array DML do Firebird 4+.
2//!
3//! Um [`Batch`] insere/atualiza/exclui muitas linhas com uma única instrução
4//! preparada, acumulando mensagens no cliente e enviando-as ao servidor em uma
5//! ida só. É muito mais rápido que executar a instrução linha a linha.
6//!
7//! ```text
8//! let mut batch = conn.create_batch(&tx, "INSERT INTO t (a, b) VALUES (?, ?)")?;
9//! batch.add(&[Value::Int(1), Value::Text("um".into())])?;
10//! batch.add(&[Value::Int(2), Value::Text("dois".into())])?;
11//! let result = batch.execute(&mut conn, &tx)?;   // envia + executa
12//! println!("{} linhas afetadas no total", result.total_affected());
13//! batch.close(&mut conn)?;                        // libera no servidor
14//! ```
15//!
16//! # Protocolo (FB4+, op codes 99–103)
17//!
18//! Descoberto por captura de um cliente C usando a interface OO `IBatch`:
19//! 1. `op_batch_create` (99): `stmt | blr(cstring) | msglen(u32) | pb(cstring)`.
20//!    O BLR descreve o formato da mensagem (igual ao `in_blr` de `op_execute`);
21//!    `msglen` é o tamanho do buffer de mensagem do lado do cliente.
22//! 2. `op_batch_msg` (100): `stmt | count(u32) | mensagens`, cada mensagem no
23//!    mesmo formato compacto (bitmap de nulos + valores XDR) usado em `op_execute`.
24//! 3. `op_batch_exec` (101): `stmt | transaction`. Responde com `op_batch_cs`.
25//! 4. `op_batch_cs` (103): estado de conclusão (contagens por linha + erros).
26//! 5. `op_batch_rls` (102): libera o batch no servidor.
27//!
28//! O cliente C agrupa create+msg e usa `op_batch_sync` (110) para drenar as
29//! respostas adiadas; nós, sendo síncronos, lemos a resposta de cada op na hora.
30//!
31//! # BLOBs em lote (política STREAM, `op_batch_blob_stream` 105)
32//!
33//! Quando a instrução tem coluna BLOB, [`Connection::create_batch`] ativa a
34//! política `BLOB_STREAM` no PB (`TAG_BLOB_POLICY` = 3). O chamador então usa
35//! [`Batch::add_blob`] para registrar cada blob e recebe um id (quad) para pôr
36//! no campo da linha. Em [`Batch::execute`], antes das mensagens, enviamos:
37//!
38//! `op_batch_blob_stream` (105): `stmt | length(u32) | stream`. O `stream` é a
39//! concatenação CRUA (sem padding) dos blobs, cada um `id(quad 8B BE) | size(4B
40//! BE) | bpb_size(4B BE) | bpb | dados`. O `length` NÃO é o número de bytes no
41//! wire: é o tamanho do buffer que o servidor aloca — a soma de `align4(16 +
42//! bpb + dados)` de cada blob — e deve ser múltiplo de 4 (o servidor rejeita
43//! caso contrário). Ao percorrer o stream (ver `xdr_blob_stream` no servidor),
44//! ele avança o ponteiro do buffer com alinhamento de 4 SEM consumir bytes do
45//! wire para o padding; o laço termina quando o que resta é menor que um
46//! cabeçalho (16 B) ou chega a zero. Por isso o wire carrega menos bytes que
47//! `length`, e a próxima op pode começar em offset não múltiplo de 4.
48//! Confirmado por captura do `11.batch.cpp` (conteúdo 30 B → length 32; 33 B →
49//! 36) e pelo código do servidor. Os blobs vão **antes** das mensagens porque a
50//! linha referencia o id já registrado.
51//!
52//! `op_batch_regblob` (104): `stmt | id_existente(quad 8B BE) | id_batch(quad)`.
53//! Mapeia um BLOB já gravado fora do batch ([`Connection::write_blob`]) a um id
54//! local, evitando reenviar os dados. Ver [`Batch::register_blob`].
55//!
56//! `op_batch_set_bpb` (106): `stmt | bpb(cstring)`. Define o BPB padrão dos
57//! blobs do batch; o servidor lê o `isc_bpb_type` para marcar os blobs como
58//! segmentados ou stream. Ver [`Batch::set_default_bpb`] / [`Batch::set_segmented`].
59//! Em modo segmentado, cada segmento no stream é um `u32` big-endian com o
60//! comprimento seguido dos bytes, SEM padding; já o campo `size` do blob segue a
61//! contabilidade do buffer do servidor (cabeçalho de 2 bytes alinhado a 2), ou
62//! seja `align2(2 + len)`. Capturado da Parte 3 do `11.batch.cpp`.
63
64use crate::blr::message_blr;
65use crate::connection::Connection;
66use crate::error::{DatabaseError, Error, Result, StatusVector};
67use crate::message::{encode_row_into, message_buffer_len};
68use crate::transaction::Transaction;
69use crate::value::{ColumnMeta, Value};
70use crate::wire::consts::*;
71use crate::wire::response::{read_op, read_response, read_response_body, read_status_vector};
72use crate::wire::stream::{op_name, op_packet};
73use crate::wire::xdr::ParameterBuffer;
74
75/// Um lote de mensagens vinculado a uma instrução preparada no servidor.
76///
77/// Crie-o com [`Connection::create_batch`], acumule linhas com [`Batch::add`] e
78/// envie com [`Batch::execute`]. O batch pode ser reutilizado (adicione mais e
79/// execute de novo). Ao terminar, chame [`Batch::close`] para liberá-lo.
80pub struct Batch {
81    handle: i32,
82    params: Vec<ColumnMeta>,
83    /// Mensagens já codificadas mas ainda não enviadas/executadas.
84    pending: Vec<u8>,
85    pending_count: u32,
86    /// Stream de BLOBs acumulado (cabeçalho + dados de cada blob), ainda não
87    /// enviado. Só usado em batches com coluna BLOB (política STREAM). Os blobs
88    /// vão concatenados SEM padding no wire (o servidor reconstrói o alinhamento
89    /// no buffer dele).
90    pending_blobs: Vec<u8>,
91    /// Comprimento *alinhado* do stream de blobs (campo `p_batch_blob_data` do
92    /// op_batch_blob_stream): a soma de `align4(16 + dados)` de cada blob. É o
93    /// tamanho do buffer que o servidor aloca, e deve ser múltiplo de
94    /// `BLOB_ALIGN` (o servidor rejeita caso contrário). Sempre `>=` o número de
95    /// bytes efetivamente enviados.
96    blob_stream_len: usize,
97    /// Registros pendentes de BLOBs pré-existentes (`op_batch_regblob`): cada par
98    /// é `(id_existente, id_no_batch)`. Enviados em [`Batch::execute`] antes das
99    /// mensagens, junto com o stream de blobs.
100    pending_regblobs: Vec<(u64, u64)>,
101    /// Próximo id (quad) a atribuir em [`Batch::add_blob`]/[`Batch::register_blob`].
102    /// Começa em 1; o id 0 é reservado (quad nulo).
103    next_blob_id: u64,
104    /// Verdadeiro se a instrução tem ao menos uma coluna BLOB (política STREAM
105    /// ativada na criação).
106    blob_stream: bool,
107    /// Charset da conexão, usado para codificar parâmetros de texto em
108    /// [`Batch::add`] (capturado na criação, já que `add` não recebe a conexão).
109    charset: crate::charset::Charset,
110    /// BPB padrão a aplicar aos blobs do batch (`op_batch_set_bpb`), se definido.
111    /// Enviado em [`Batch::execute`] antes do stream de blobs.
112    default_bpb: Option<Vec<u8>>,
113    /// Verdadeiro quando o BPB padrão marca os blobs como segmentados; nesse caso
114    /// [`Batch::add_blob`] enquadra os dados em segmentos.
115    blob_segmented: bool,
116    closed: bool,
117}
118
119/// Alinhamento do cabeçalho de segmento em um blob segmentado
120/// (`IBatch::BLOB_SEGHDR_ALIGN`). Como cada `add_blob` segmentado começa numa
121/// fronteira de 4 bytes (já múltiplo de 2), não há padding a inserir.
122const BLOB_SEGHDR_ALIGN: usize = 2;
123const _: () = assert!(BLOB_ALIGN.is_multiple_of(BLOB_SEGHDR_ALIGN));
124
125/// Arredonda `n` para cima até um múltiplo de `align` (potência de 2).
126#[inline]
127fn align_up(n: usize, align: usize) -> usize {
128    (n + align - 1) & !(align - 1)
129}
130
131/// Verdadeiro se o BPB marca o blob como segmentado: clumplet `isc_bpb_type`(3)
132/// com valor `isc_bpb_type_segmented`(0). Formato do BPB: byte de versão seguido
133/// de clumplets `tag(1) | len(1) | valor(len)`.
134fn bpb_is_segmented(bpb: &[u8]) -> bool {
135    let mut i = 1; // pula o byte de versão
136    while i + 1 < bpb.len() {
137        let tag = bpb[i];
138        let len = bpb[i + 1] as usize;
139        let val = &bpb[i + 2..(i + 2 + len).min(bpb.len())];
140        if tag == bpb::TYPE {
141            return val.first() == Some(&bpb::TYPE_SEGMENTED);
142        }
143        i += 2 + len;
144    }
145    false
146}
147
148/// Alinhamento (em bytes) de cada blob dentro do stream de `op_batch_blob_stream`.
149/// O servidor FB5 usa 4 (confirmado por captura: conteúdo de 30 B → campo de
150/// comprimento 32; de 33 B → 36). É o valor que `IBatch::getBlobAlignment`
151/// retorna para o protocolo remoto.
152const BLOB_ALIGN: usize = 4;
153
154impl Batch {
155    /// O handle da instrução/batch no servidor.
156    pub fn handle(&self) -> i32 {
157        self.handle
158    }
159
160    /// Quantas mensagens estão acumuladas mas ainda não executadas.
161    pub fn pending(&self) -> u32 {
162        self.pending_count
163    }
164
165    /// Os metadados dos parâmetros de entrada (a forma de cada linha esperada).
166    pub fn params(&self) -> &[ColumnMeta] {
167        &self.params
168    }
169
170    /// Registra um BLOB no stream do lote e devolve seu id (quad), que deve ser
171    /// posto no campo BLOB da linha correspondente como [`Value::Blob`].
172    ///
173    /// Só funciona em batches cuja instrução tem coluna BLOB (a política STREAM é
174    /// ativada automaticamente em [`Connection::create_batch`] nesse caso). Como
175    /// [`Self::add`], apenas acumula na memória; os blobs vão à rede em
176    /// [`Self::execute`], **antes** das mensagens que os referenciam.
177    ///
178    /// ```text
179    /// let id = batch.add_blob(b"conteudo do blob")?;
180    /// batch.add(&[Value::Text("BAT31".into()), Value::Blob(id)])?;
181    /// ```
182    pub fn add_blob(&mut self, data: &[u8]) -> Result<u64> {
183        if !self.blob_stream {
184            return Err(Error::protocol(
185                "add_blob exige uma coluna BLOB na instrução do batch",
186            ));
187        }
188        let id = self.next_blob_id;
189        self.next_blob_id += 1;
190        // Cabeçalho do blob, tudo big-endian: id(quad 8B) | size(4B) | bpb_size(4B
191        // = 0, pois o BPB é do batch). Os blobs vão concatenados SEM padding; o
192        // servidor reconstrói o alinhamento no buffer dele.
193        self.pending_blobs.extend_from_slice(&id.to_be_bytes());
194        if self.blob_segmented {
195            // Blob segmentado (um único segmento aqui). NO WIRE: cada segmento é
196            // `u32` big-endian com o comprimento + os bytes, sem padding. Mas o
197            // campo `size` (e o comprimento do stream) seguem a contabilidade do
198            // *buffer* do servidor, que usa cabeçalho de 2 bytes alinhado a
199            // `BLOB_SEGHDR_ALIGN`: `size = align2(2 + len)`. Ver `xdr_blob_stream`.
200            let size_field = align_up(2 + data.len(), BLOB_SEGHDR_ALIGN);
201            self.pending_blobs
202                .extend_from_slice(&(size_field as u32).to_be_bytes());
203            self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
204            self.pending_blobs
205                .extend_from_slice(&(data.len() as u32).to_be_bytes());
206            self.pending_blobs.extend_from_slice(data);
207            self.blob_stream_len += align_up(16 + size_field, BLOB_ALIGN);
208        } else {
209            // Blob de stream (não segmentado): dados crus.
210            self.pending_blobs
211                .extend_from_slice(&(data.len() as u32).to_be_bytes());
212            self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
213            self.pending_blobs.extend_from_slice(data);
214            self.blob_stream_len += align_up(16 + data.len(), BLOB_ALIGN);
215        }
216        Ok(id)
217    }
218
219    /// Define o BPB (blob parameter buffer) padrão dos blobs do batch
220    /// (`op_batch_set_bpb`). O uso típico é marcar os blobs como **segmentados**
221    /// (`isc_bpb_type = isc_bpb_type_segmented`): nesse caso [`Self::add_blob`]
222    /// passa a enquadrar cada blob como um segmento. Deve ser chamado antes de
223    /// [`Self::add_blob`]; o BPB vai à rede em [`Self::execute`], antes dos blobs.
224    ///
225    /// Conveniência: [`Self::set_segmented`] monta o BPB segmentado para você.
226    pub fn set_default_bpb(&mut self, bpb: Vec<u8>) -> Result<()> {
227        if !self.blob_stream {
228            return Err(Error::protocol(
229                "set_default_bpb exige uma coluna BLOB na instrução do batch",
230            ));
231        }
232        // Detecta o tipo segmentado: clumplet `isc_bpb_type`(3) com valor
233        // `isc_bpb_type_segmented`(0). BPB usa comprimento de 1 byte por clumplet.
234        self.blob_segmented = bpb_is_segmented(&bpb);
235        self.default_bpb = Some(bpb);
236        Ok(())
237    }
238
239    /// Atalho para [`Self::set_default_bpb`] com um BPB que marca os blobs como
240    /// segmentados (`segmentado = true`) ou stream (`false`, o padrão).
241    pub fn set_segmented(&mut self, segmented: bool) -> Result<()> {
242        let kind = if segmented {
243            bpb::TYPE_SEGMENTED
244        } else {
245            bpb::TYPE_STREAM
246        };
247        // BPB no formato que o fbclient envia: versão(1) | tag isc_bpb_type(3) |
248        // len(1) = 4 | valor(4 bytes LE). Capturado do `11.batch.cpp` Parte 3.
249        self.set_default_bpb(vec![BPB_VERSION1, bpb::TYPE, 4, kind, 0, 0, 0])
250    }
251
252    /// Mapeia um BLOB **já existente** (criado fora do batch, p.ex. via
253    /// [`Connection::write_blob`]/[`Connection::create_blob`]) para um id local
254    /// do batch e devolve esse id, que deve ir no campo BLOB da linha como
255    /// [`Value::Blob`]. Útil para reaproveitar BLOBs grandes já gravados sem
256    /// reenviá-los pelo stream (`op_batch_regblob`).
257    ///
258    /// Como [`Self::add_blob`], só vale em batches com coluna BLOB; o registro é
259    /// acumulado e enviado em [`Self::execute`], antes das mensagens.
260    pub fn register_blob(&mut self, existing_id: u64) -> Result<u64> {
261        if !self.blob_stream {
262            return Err(Error::protocol(
263                "register_blob exige uma coluna BLOB na instrução do batch",
264            ));
265        }
266        let batch_id = self.next_blob_id;
267        self.next_blob_id += 1;
268        self.pending_regblobs.push((existing_id, batch_id));
269        Ok(batch_id)
270    }
271
272    /// Adiciona uma linha ao lote. Os valores devem corresponder, em número e
273    /// tipo, aos parâmetros da instrução (ver [`Self::params`]). Apenas acumula
274    /// na memória; nada vai à rede até [`Self::execute`].
275    pub fn add(&mut self, values: &[Value]) -> Result<()> {
276        // Codifica direto no buffer pendente (sem Vec temporário por linha). Em erro,
277        // `encode_row_into` restaura o tamanho de `pending`, então não corrompe o lote.
278        encode_row_into(&mut self.pending, &self.params, values, self.charset)?;
279        self.pending_count += 1;
280        Ok(())
281    }
282
283    /// Envia as mensagens acumuladas e executa o lote, retornando o estado de
284    /// conclusão (contagens por linha e erros por linha). Esvazia o buffer
285    /// pendente; o batch pode então ser reutilizado.
286    pub fn execute(&mut self, conn: &mut Connection, tx: &Transaction) -> Result<BatchResult> {
287        if self.closed {
288            return Err(Error::protocol("batch já foi fechado"));
289        }
290        // 0a. Define o BPB padrão (op_batch_set_bpb), se houver, antes dos blobs.
291        if let Some(bpb) = self.default_bpb.take() {
292            let mut w = op_packet(op::BATCH_SET_BPB);
293            w.put_i32(self.handle);
294            w.put_bytes(&bpb); // cstring: len(4) + bpb + pad
295            conn.io().send(&w)?;
296            read_response(conn.io())?;
297        }
298        // 0b. Envia os BLOBs pendentes (op_batch_blob_stream) ANTES das mensagens,
299        //    pois cada mensagem referencia um id de blob já registrado.
300        if !self.pending_blobs.is_empty() {
301            let mut w = op_packet(op::BATCH_BLOB_STREAM);
302            w.put_i32(self.handle);
303            // Comprimento alinhado (≥ bytes no wire), depois os blobs crus. Não
304            // alinhamos o pacote: o servidor recompõe o alinhamento internamente
305            // e a próxima op pode começar em offset não múltiplo de 4.
306            w.put_i32(self.blob_stream_len as i32);
307            w.put_raw(&self.pending_blobs);
308            conn.io().send(&w)?;
309            read_response(conn.io())?;
310            self.pending_blobs.clear();
311            self.blob_stream_len = 0;
312        }
313        // 0b. Registra BLOBs pré-existentes (op_batch_regblob), também antes das
314        //     mensagens. Layout: stmt | id_existente(quad 8B BE) | id_batch(quad).
315        if !self.pending_regblobs.is_empty() {
316            for (existing_id, batch_id) in std::mem::take(&mut self.pending_regblobs) {
317                let mut w = op_packet(op::BATCH_REGBLOB);
318                w.put_i32(self.handle);
319                w.put_raw(&existing_id.to_be_bytes());
320                w.put_raw(&batch_id.to_be_bytes());
321                conn.io().send(&w)?;
322                read_response(conn.io())?;
323            }
324        }
325        // 1. Envia as mensagens pendentes (op_batch_msg), se houver.
326        if self.pending_count > 0 {
327            let mut w = op_packet(op::BATCH_MSG);
328            w.put_i32(self.handle);
329            w.put_i32(self.pending_count as i32);
330            w.put_raw(&self.pending);
331            w.align();
332            conn.io().send(&w)?;
333            read_response(conn.io())?;
334            self.pending.clear();
335            self.pending_count = 0;
336        }
337
338        // 2. Executa o lote (op_batch_exec) e lê o estado de conclusão.
339        let mut w = op_packet(op::BATCH_EXEC);
340        w.put_i32(self.handle);
341        w.put_i32(tx.handle());
342        conn.io().send(&w)?;
343        read_batch_cs(conn)
344    }
345
346    /// Descarta as mensagens acumuladas que ainda não foram executadas
347    /// (`op_batch_cancel`). Não afeta linhas já executadas em chamadas anteriores.
348    pub fn cancel(&mut self, conn: &mut Connection) -> Result<()> {
349        self.pending.clear();
350        self.pending_count = 0;
351        self.pending_blobs.clear();
352        self.blob_stream_len = 0;
353        self.pending_regblobs.clear();
354        let mut w = op_packet(op::BATCH_CANCEL);
355        w.put_i32(self.handle);
356        conn.io().send(&w)?;
357        read_response(conn.io())?;
358        Ok(())
359    }
360
361    /// Libera o batch e a instrução preparada no servidor (`op_batch_rls` +
362    /// `op_free_statement` com `DSQL_drop`), consumindo o handle.
363    pub fn close(mut self, conn: &mut Connection) -> Result<()> {
364        self.closed = true;
365        let mut w = op_packet(op::BATCH_RLS);
366        w.put_i32(self.handle);
367        conn.io().send(&w)?;
368        read_response(conn.io())?;
369
370        let mut w = op_packet(op::FREE_STATEMENT);
371        w.put_i32(self.handle);
372        w.put_i32(free::DROP);
373        conn.io().send(&w)?;
374        read_response(conn.io())?;
375        Ok(())
376    }
377}
378
379impl Drop for Batch {
380    fn drop(&mut self) {
381        if !self.closed {
382            crate::warn_unclosed("Batch", self.handle);
383        }
384    }
385}
386
387/// Opções de criação de um [`Batch`] (clumplets do `op_batch_create`).
388#[derive(Debug, Clone, Copy)]
389pub struct BatchOptions {
390    /// Se `true`, o servidor CONTINUA após uma linha falhar, executando as demais e
391    /// reportando o erro de cada uma em [`BatchResult::errors`]. Para isso ele
392    /// bracketa cada linha num savepoint interno — o que tem **custo por linha**.
393    ///
394    /// Se `false` (**padrão**), o lote PARA na primeira linha que falha (fail-fast):
395    /// é bem mais rápido (sem savepoint por linha) e é o que se quer quando a
396    /// transação é "tudo ou nada" e qualquer erro aborta a operação inteira.
397    pub multierror: bool,
398    /// Se `true` (**padrão**), o servidor reporta a contagem de linhas afetadas por
399    /// mensagem em [`BatchResult::update_counts`] (e portanto [`BatchResult::total_affected`]).
400    pub record_counts: bool,
401}
402
403impl Default for BatchOptions {
404    fn default() -> Self {
405        // Fail-fast por padrão: a maioria dos usos de DML em lote quer abortar no
406        // primeiro erro, e o multierror (savepoint por linha) só compensa quando o
407        // chamador realmente vai inspecionar o erro de cada linha.
408        BatchOptions {
409            multierror: false,
410            record_counts: true,
411        }
412    }
413}
414
415impl BatchOptions {
416    /// Opções padrão (fail-fast, com contagens por linha).
417    pub fn new() -> Self {
418        Self::default()
419    }
420
421    /// Liga/desliga o modo multierro (continuar após erros por linha).
422    pub fn multierror(mut self, on: bool) -> Self {
423        self.multierror = on;
424        self
425    }
426
427    /// Liga/desliga o reporte de contagens de linhas afetadas por mensagem.
428    pub fn record_counts(mut self, on: bool) -> Self {
429        self.record_counts = on;
430        self
431    }
432}
433
434impl Connection {
435    /// Prepara uma instrução e cria um lote (batch) sobre ela com as opções padrão
436    /// ([`BatchOptions::default`]: fail-fast, com contagens por linha). A instrução
437    /// deve ter parâmetros (`?`) — cada [`Batch::add`] fornece uma linha de valores.
438    ///
439    /// Para continuar após erros por linha e coletá-los todos, use
440    /// [`Self::create_batch_with`] com `BatchOptions::new().multierror(true)`.
441    pub fn create_batch(&mut self, tx: &Transaction, sql: &str) -> Result<Batch> {
442        self.create_batch_with(tx, sql, BatchOptions::default())
443    }
444
445    /// Como [`Self::create_batch`], mas com [`BatchOptions`] explícitas.
446    pub fn create_batch_with(
447        &mut self,
448        tx: &Transaction,
449        sql: &str,
450        opts: BatchOptions,
451    ) -> Result<Batch> {
452        let mut stmt = self.prepare(tx, sql)?;
453        let handle = stmt.handle();
454        let params: Vec<ColumnMeta> = stmt.params().to_vec();
455        // O handle passa a viver no Batch (liberado por Batch::close), então
456        // marcamos como transferido para não disparar o aviso de Drop e soltamos
457        // o wrapper aqui (libera só memória).
458        stmt.forget_handle();
459        drop(stmt);
460
461        let blr = message_blr(&params);
462        let msglen = message_buffer_len(&params);
463
464        // Se houver coluna BLOB, ativa a política STREAM: os blobs são enviados
465        // em `op_batch_blob_stream` e a linha referencia o id (ver `add_blob`).
466        let blob_stream = params
467            .iter()
468            .any(|c| sql_type::base(c.sql_type) == sql_type::BLOB);
469
470        // Buffer de parâmetros do batch: byte de versão (1) seguido de clumplets
471        // com comprimento LE de 4 bytes. Os tags são opcionais (ver `BatchOptions`).
472        let mut pb = ParameterBuffer::new(1);
473        if opts.record_counts {
474            pb.bytes_be_len4(batch_tag::RECORD_COUNTS, &1u32.to_le_bytes());
475        }
476        if opts.multierror {
477            pb.bytes_be_len4(batch_tag::MULTIERROR, &1u32.to_le_bytes());
478        }
479        if blob_stream {
480            pb.bytes_be_len4(
481                batch_tag::BLOB_POLICY,
482                &(blob_policy::STREAM as u32).to_le_bytes(),
483            );
484        }
485
486        let mut w = op_packet(op::BATCH_CREATE);
487        w.put_i32(handle);
488        w.put_bytes(&blr); // cstring: len(4) + blr + pad
489        w.put_i32(msglen as i32);
490        w.put_bytes(pb.as_slice()); // cstring: len(4) + pb + pad
491        self.io().send(&w)?;
492        read_response(self.io())?;
493
494        Ok(Batch {
495            handle,
496            params,
497            pending: Vec::new(),
498            pending_count: 0,
499            pending_blobs: Vec::new(),
500            blob_stream_len: 0,
501            pending_regblobs: Vec::new(),
502            next_blob_id: 1,
503            blob_stream,
504            charset: self.charset(),
505            default_bpb: None,
506            blob_segmented: false,
507            closed: false,
508        })
509    }
510}
511
512/// Resultado da execução de um lote: o estado de conclusão por mensagem.
513#[derive(Debug, Clone, Default)]
514pub struct BatchResult {
515    /// Total de mensagens processadas nesta execução.
516    pub total: u32,
517    /// Contagem de linhas afetadas por mensagem, na ordem em que foram
518    /// adicionadas. `>= 0` é o número de linhas; [`batch_cs::EXECUTE_FAILED`]
519    /// (−1) marca uma mensagem que falhou; [`batch_cs::SUCCESS_NO_INFO`] (−2)
520    /// indica sucesso sem contagem reportada.
521    pub update_counts: Vec<i32>,
522    /// Erros detalhados por mensagem (índice da mensagem + erro do servidor).
523    pub errors: Vec<BatchError>,
524}
525
526impl BatchResult {
527    /// Verdadeiro se nenhuma mensagem falhou.
528    pub fn all_succeeded(&self) -> bool {
529        self.errors.is_empty() && !self.update_counts.contains(&batch_cs::EXECUTE_FAILED)
530    }
531
532    /// Soma das linhas afetadas pelas mensagens bem-sucedidas (ignora as que
533    /// falharam ou não reportaram contagem).
534    pub fn total_affected(&self) -> u64 {
535        self.update_counts
536            .iter()
537            .filter(|&&c| c >= 0)
538            .map(|&c| c as u64)
539            .sum()
540    }
541}
542
543/// Um erro detalhado de uma mensagem específica do lote.
544#[derive(Debug, Clone)]
545pub struct BatchError {
546    /// Índice (base zero) da mensagem que falhou, na ordem de adição.
547    pub message_index: u32,
548    /// O erro reportado pelo servidor para essa mensagem.
549    pub error: DatabaseError,
550}
551
552/// Lê a resposta `op_batch_cs` de um `op_batch_exec`.
553///
554/// Layout (confirmado por captura, inclusive com erros forçados):
555/// `op | stmt | reccount | updates | vectors | errors |`
556/// `updates×i32 (contagens) | vectors×(pos u32 + status vector) | errors×u32`.
557fn read_batch_cs(conn: &mut Connection) -> Result<BatchResult> {
558    let code = read_op(conn.io())?;
559    if code == op::RESPONSE {
560        // Falha global (não por linha) veio como op_response.
561        read_response_body(conn.io())?.into_result()?;
562        return Err(Error::protocol(
563            "op_batch_exec retornou op_response sem erro",
564        ));
565    }
566    if code != op::BATCH_CS {
567        return Err(Error::protocol(format!(
568            "esperava op_batch_cs, veio {} ({code})",
569            op_name(code)
570        )));
571    }
572
573    let _stmt = conn.io().read_i32()?;
574    let reccount = conn.io().read_i32()? as u32;
575    let updates = conn.io().read_i32()? as u32;
576    let vectors = conn.io().read_i32()? as u32;
577    let errors = conn.io().read_i32()? as u32;
578
579    let mut update_counts = Vec::with_capacity(updates as usize);
580    for _ in 0..updates {
581        update_counts.push(conn.io().read_i32()?);
582    }
583
584    let mut batch_errors = Vec::with_capacity(vectors as usize);
585    for _ in 0..vectors {
586        let pos = conn.io().read_i32()? as u32;
587        let status = read_status_vector(conn.io())?;
588        batch_errors.push(BatchError {
589            message_index: pos,
590            error: DatabaseError::new(status),
591        });
592    }
593    // Lista simples de posições com erro (quando os detalhes não são pedidos).
594    for _ in 0..errors {
595        let pos = conn.io().read_i32()? as u32;
596        if !batch_errors.iter().any(|e| e.message_index == pos) {
597            let empty = StatusVector {
598                args: Vec::new(),
599                sql_state: None,
600            };
601            batch_errors.push(BatchError {
602                message_index: pos,
603                error: DatabaseError::new(empty),
604            });
605        }
606    }
607
608    Ok(BatchResult {
609        total: reccount,
610        update_counts,
611        errors: batch_errors,
612    })
613}