firebird-wire 0.1.2

Pure-Rust sync driver for Firebird 5+ (wire protocol, SRP auth, batch/array DML)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
//! DML em lote (batch) — o recurso "principal" de array DML do Firebird 4+.
//!
//! Um [`Batch`] insere/atualiza/exclui muitas linhas com uma única instrução
//! preparada, acumulando mensagens no cliente e enviando-as ao servidor em uma
//! ida só. É muito mais rápido que executar a instrução linha a linha.
//!
//! ```text
//! let mut batch = conn.create_batch(&tx, "INSERT INTO t (a, b) VALUES (?, ?)")?;
//! batch.add(&[Value::Int(1), Value::Text("um".into())])?;
//! batch.add(&[Value::Int(2), Value::Text("dois".into())])?;
//! let result = batch.execute(&mut conn, &tx)?;   // envia + executa
//! println!("{} linhas afetadas no total", result.total_affected());
//! batch.close(&mut conn)?;                        // libera no servidor
//! ```
//!
//! # Protocolo (FB4+, op codes 99–103)
//!
//! Descoberto por captura de um cliente C usando a interface OO `IBatch`:
//! 1. `op_batch_create` (99): `stmt | blr(cstring) | msglen(u32) | pb(cstring)`.
//!    O BLR descreve o formato da mensagem (igual ao `in_blr` de `op_execute`);
//!    `msglen` é o tamanho do buffer de mensagem do lado do cliente.
//! 2. `op_batch_msg` (100): `stmt | count(u32) | mensagens`, cada mensagem no
//!    mesmo formato compacto (bitmap de nulos + valores XDR) usado em `op_execute`.
//! 3. `op_batch_exec` (101): `stmt | transaction`. Responde com `op_batch_cs`.
//! 4. `op_batch_cs` (103): estado de conclusão (contagens por linha + erros).
//! 5. `op_batch_rls` (102): libera o batch no servidor.
//!
//! O cliente C agrupa create+msg e usa `op_batch_sync` (110) para drenar as
//! respostas adiadas; nós, sendo síncronos, lemos a resposta de cada op na hora.
//!
//! # BLOBs em lote (política STREAM, `op_batch_blob_stream` 105)
//!
//! Quando a instrução tem coluna BLOB, [`Connection::create_batch`] ativa a
//! política `BLOB_STREAM` no PB (`TAG_BLOB_POLICY` = 3). O chamador então usa
//! [`Batch::add_blob`] para registrar cada blob e recebe um id (quad) para pôr
//! no campo da linha. Em [`Batch::execute`], antes das mensagens, enviamos:
//!
//! `op_batch_blob_stream` (105): `stmt | length(u32) | stream`. O `stream` é a
//! concatenação CRUA (sem padding) dos blobs, cada um `id(quad 8B BE) | size(4B
//! BE) | bpb_size(4B BE) | bpb | dados`. O `length` NÃO é o número de bytes no
//! wire: é o tamanho do buffer que o servidor aloca — a soma de `align4(16 +
//! bpb + dados)` de cada blob — e deve ser múltiplo de 4 (o servidor rejeita
//! caso contrário). Ao percorrer o stream (ver `xdr_blob_stream` no servidor),
//! ele avança o ponteiro do buffer com alinhamento de 4 SEM consumir bytes do
//! wire para o padding; o laço termina quando o que resta é menor que um
//! cabeçalho (16 B) ou chega a zero. Por isso o wire carrega menos bytes que
//! `length`, e a próxima op pode começar em offset não múltiplo de 4.
//! Confirmado por captura do `11.batch.cpp` (conteúdo 30 B → length 32; 33 B →
//! 36) e pelo código do servidor. Os blobs vão **antes** das mensagens porque a
//! linha referencia o id já registrado.
//!
//! `op_batch_regblob` (104): `stmt | id_existente(quad 8B BE) | id_batch(quad)`.
//! Mapeia um BLOB já gravado fora do batch ([`Connection::write_blob`]) a um id
//! local, evitando reenviar os dados. Ver [`Batch::register_blob`].
//!
//! `op_batch_set_bpb` (106): `stmt | bpb(cstring)`. Define o BPB padrão dos
//! blobs do batch; o servidor lê o `isc_bpb_type` para marcar os blobs como
//! segmentados ou stream. Ver [`Batch::set_default_bpb`] / [`Batch::set_segmented`].
//! Em modo segmentado, cada segmento no stream é um `u32` big-endian com o
//! comprimento seguido dos bytes, SEM padding; já o campo `size` do blob segue a
//! contabilidade do buffer do servidor (cabeçalho de 2 bytes alinhado a 2), ou
//! seja `align2(2 + len)`. Capturado da Parte 3 do `11.batch.cpp`.

use crate::blr::message_blr;
use crate::connection::Connection;
use crate::error::{DatabaseError, Error, Result, StatusVector};
use crate::message::{encode_row_into, message_buffer_len};
use crate::transaction::Transaction;
use crate::value::{ColumnMeta, Value};
use crate::wire::consts::*;
use crate::wire::response::{read_op, read_response, read_response_body, read_status_vector};
use crate::wire::stream::{op_name, op_packet};
use crate::wire::xdr::ParameterBuffer;

/// Um lote de mensagens vinculado a uma instrução preparada no servidor.
///
/// Crie-o com [`Connection::create_batch`], acumule linhas com [`Batch::add`] e
/// envie com [`Batch::execute`]. O batch pode ser reutilizado (adicione mais e
/// execute de novo). Ao terminar, chame [`Batch::close`] para liberá-lo.
pub struct Batch {
    handle: i32,
    params: Vec<ColumnMeta>,
    /// Mensagens já codificadas mas ainda não enviadas/executadas.
    pending: Vec<u8>,
    pending_count: u32,
    /// Stream de BLOBs acumulado (cabeçalho + dados de cada blob), ainda não
    /// enviado. Só usado em batches com coluna BLOB (política STREAM). Os blobs
    /// vão concatenados SEM padding no wire (o servidor reconstrói o alinhamento
    /// no buffer dele).
    pending_blobs: Vec<u8>,
    /// Comprimento *alinhado* do stream de blobs (campo `p_batch_blob_data` do
    /// op_batch_blob_stream): a soma de `align4(16 + dados)` de cada blob. É o
    /// tamanho do buffer que o servidor aloca, e deve ser múltiplo de
    /// `BLOB_ALIGN` (o servidor rejeita caso contrário). Sempre `>=` o número de
    /// bytes efetivamente enviados.
    blob_stream_len: usize,
    /// Registros pendentes de BLOBs pré-existentes (`op_batch_regblob`): cada par
    /// é `(id_existente, id_no_batch)`. Enviados em [`Batch::execute`] antes das
    /// mensagens, junto com o stream de blobs.
    pending_regblobs: Vec<(u64, u64)>,
    /// Próximo id (quad) a atribuir em [`Batch::add_blob`]/[`Batch::register_blob`].
    /// Começa em 1; o id 0 é reservado (quad nulo).
    next_blob_id: u64,
    /// Verdadeiro se a instrução tem ao menos uma coluna BLOB (política STREAM
    /// ativada na criação).
    blob_stream: bool,
    /// Charset da conexão, usado para codificar parâmetros de texto em
    /// [`Batch::add`] (capturado na criação, já que `add` não recebe a conexão).
    charset: crate::charset::Charset,
    /// BPB padrão a aplicar aos blobs do batch (`op_batch_set_bpb`), se definido.
    /// Enviado em [`Batch::execute`] antes do stream de blobs.
    default_bpb: Option<Vec<u8>>,
    /// Verdadeiro quando o BPB padrão marca os blobs como segmentados; nesse caso
    /// [`Batch::add_blob`] enquadra os dados em segmentos.
    blob_segmented: bool,
    closed: bool,
}

/// Alinhamento do cabeçalho de segmento em um blob segmentado
/// (`IBatch::BLOB_SEGHDR_ALIGN`). Como cada `add_blob` segmentado começa numa
/// fronteira de 4 bytes (já múltiplo de 2), não há padding a inserir.
const BLOB_SEGHDR_ALIGN: usize = 2;
const _: () = assert!(BLOB_ALIGN.is_multiple_of(BLOB_SEGHDR_ALIGN));

/// Arredonda `n` para cima até um múltiplo de `align` (potência de 2).
#[inline]
fn align_up(n: usize, align: usize) -> usize {
    (n + align - 1) & !(align - 1)
}

/// Verdadeiro se o BPB marca o blob como segmentado: clumplet `isc_bpb_type`(3)
/// com valor `isc_bpb_type_segmented`(0). Formato do BPB: byte de versão seguido
/// de clumplets `tag(1) | len(1) | valor(len)`.
fn bpb_is_segmented(bpb: &[u8]) -> bool {
    let mut i = 1; // pula o byte de versão
    while i + 1 < bpb.len() {
        let tag = bpb[i];
        let len = bpb[i + 1] as usize;
        let val = &bpb[i + 2..(i + 2 + len).min(bpb.len())];
        if tag == bpb::TYPE {
            return val.first() == Some(&bpb::TYPE_SEGMENTED);
        }
        i += 2 + len;
    }
    false
}

/// Alinhamento (em bytes) de cada blob dentro do stream de `op_batch_blob_stream`.
/// O servidor FB5 usa 4 (confirmado por captura: conteúdo de 30 B → campo de
/// comprimento 32; de 33 B → 36). É o valor que `IBatch::getBlobAlignment`
/// retorna para o protocolo remoto.
const BLOB_ALIGN: usize = 4;

impl Batch {
    /// O handle da instrução/batch no servidor.
    pub fn handle(&self) -> i32 {
        self.handle
    }

    /// Quantas mensagens estão acumuladas mas ainda não executadas.
    pub fn pending(&self) -> u32 {
        self.pending_count
    }

    /// Os metadados dos parâmetros de entrada (a forma de cada linha esperada).
    pub fn params(&self) -> &[ColumnMeta] {
        &self.params
    }

    /// Registra um BLOB no stream do lote e devolve seu id (quad), que deve ser
    /// posto no campo BLOB da linha correspondente como [`Value::Blob`].
    ///
    /// Só funciona em batches cuja instrução tem coluna BLOB (a política STREAM é
    /// ativada automaticamente em [`Connection::create_batch`] nesse caso). Como
    /// [`Self::add`], apenas acumula na memória; os blobs vão à rede em
    /// [`Self::execute`], **antes** das mensagens que os referenciam.
    ///
    /// ```text
    /// let id = batch.add_blob(b"conteudo do blob")?;
    /// batch.add(&[Value::Text("BAT31".into()), Value::Blob(id)])?;
    /// ```
    pub fn add_blob(&mut self, data: &[u8]) -> Result<u64> {
        if !self.blob_stream {
            return Err(Error::protocol(
                "add_blob exige uma coluna BLOB na instrução do batch",
            ));
        }
        let id = self.next_blob_id;
        self.next_blob_id += 1;
        // Cabeçalho do blob, tudo big-endian: id(quad 8B) | size(4B) | bpb_size(4B
        // = 0, pois o BPB é do batch). Os blobs vão concatenados SEM padding; o
        // servidor reconstrói o alinhamento no buffer dele.
        self.pending_blobs.extend_from_slice(&id.to_be_bytes());
        if self.blob_segmented {
            // Blob segmentado (um único segmento aqui). NO WIRE: cada segmento é
            // `u32` big-endian com o comprimento + os bytes, sem padding. Mas o
            // campo `size` (e o comprimento do stream) seguem a contabilidade do
            // *buffer* do servidor, que usa cabeçalho de 2 bytes alinhado a
            // `BLOB_SEGHDR_ALIGN`: `size = align2(2 + len)`. Ver `xdr_blob_stream`.
            let size_field = align_up(2 + data.len(), BLOB_SEGHDR_ALIGN);
            self.pending_blobs
                .extend_from_slice(&(size_field as u32).to_be_bytes());
            self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
            self.pending_blobs
                .extend_from_slice(&(data.len() as u32).to_be_bytes());
            self.pending_blobs.extend_from_slice(data);
            self.blob_stream_len += align_up(16 + size_field, BLOB_ALIGN);
        } else {
            // Blob de stream (não segmentado): dados crus.
            self.pending_blobs
                .extend_from_slice(&(data.len() as u32).to_be_bytes());
            self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
            self.pending_blobs.extend_from_slice(data);
            self.blob_stream_len += align_up(16 + data.len(), BLOB_ALIGN);
        }
        Ok(id)
    }

    /// Define o BPB (blob parameter buffer) padrão dos blobs do batch
    /// (`op_batch_set_bpb`). O uso típico é marcar os blobs como **segmentados**
    /// (`isc_bpb_type = isc_bpb_type_segmented`): nesse caso [`Self::add_blob`]
    /// passa a enquadrar cada blob como um segmento. Deve ser chamado antes de
    /// [`Self::add_blob`]; o BPB vai à rede em [`Self::execute`], antes dos blobs.
    ///
    /// Conveniência: [`Self::set_segmented`] monta o BPB segmentado para você.
    pub fn set_default_bpb(&mut self, bpb: Vec<u8>) -> Result<()> {
        if !self.blob_stream {
            return Err(Error::protocol(
                "set_default_bpb exige uma coluna BLOB na instrução do batch",
            ));
        }
        // Detecta o tipo segmentado: clumplet `isc_bpb_type`(3) com valor
        // `isc_bpb_type_segmented`(0). BPB usa comprimento de 1 byte por clumplet.
        self.blob_segmented = bpb_is_segmented(&bpb);
        self.default_bpb = Some(bpb);
        Ok(())
    }

    /// Atalho para [`Self::set_default_bpb`] com um BPB que marca os blobs como
    /// segmentados (`segmentado = true`) ou stream (`false`, o padrão).
    pub fn set_segmented(&mut self, segmented: bool) -> Result<()> {
        let kind = if segmented {
            bpb::TYPE_SEGMENTED
        } else {
            bpb::TYPE_STREAM
        };
        // BPB no formato que o fbclient envia: versão(1) | tag isc_bpb_type(3) |
        // len(1) = 4 | valor(4 bytes LE). Capturado do `11.batch.cpp` Parte 3.
        self.set_default_bpb(vec![BPB_VERSION1, bpb::TYPE, 4, kind, 0, 0, 0])
    }

    /// Mapeia um BLOB **já existente** (criado fora do batch, p.ex. via
    /// [`Connection::write_blob`]/[`Connection::create_blob`]) para um id local
    /// do batch e devolve esse id, que deve ir no campo BLOB da linha como
    /// [`Value::Blob`]. Útil para reaproveitar BLOBs grandes já gravados sem
    /// reenviá-los pelo stream (`op_batch_regblob`).
    ///
    /// Como [`Self::add_blob`], só vale em batches com coluna BLOB; o registro é
    /// acumulado e enviado em [`Self::execute`], antes das mensagens.
    pub fn register_blob(&mut self, existing_id: u64) -> Result<u64> {
        if !self.blob_stream {
            return Err(Error::protocol(
                "register_blob exige uma coluna BLOB na instrução do batch",
            ));
        }
        let batch_id = self.next_blob_id;
        self.next_blob_id += 1;
        self.pending_regblobs.push((existing_id, batch_id));
        Ok(batch_id)
    }

    /// Adiciona uma linha ao lote. Os valores devem corresponder, em número e
    /// tipo, aos parâmetros da instrução (ver [`Self::params`]). Apenas acumula
    /// na memória; nada vai à rede até [`Self::execute`].
    pub fn add(&mut self, values: &[Value]) -> Result<()> {
        // Codifica direto no buffer pendente (sem Vec temporário por linha). Em erro,
        // `encode_row_into` restaura o tamanho de `pending`, então não corrompe o lote.
        encode_row_into(&mut self.pending, &self.params, values, self.charset)?;
        self.pending_count += 1;
        Ok(())
    }

    /// Envia as mensagens acumuladas e executa o lote, retornando o estado de
    /// conclusão (contagens por linha e erros por linha). Esvazia o buffer
    /// pendente; o batch pode então ser reutilizado.
    pub fn execute(&mut self, conn: &mut Connection, tx: &Transaction) -> Result<BatchResult> {
        if self.closed {
            return Err(Error::protocol("batch já foi fechado"));
        }
        // 0a. Define o BPB padrão (op_batch_set_bpb), se houver, antes dos blobs.
        if let Some(bpb) = self.default_bpb.take() {
            let mut w = op_packet(op::BATCH_SET_BPB);
            w.put_i32(self.handle);
            w.put_bytes(&bpb); // cstring: len(4) + bpb + pad
            conn.io().send(&w)?;
            read_response(conn.io())?;
        }
        // 0b. Envia os BLOBs pendentes (op_batch_blob_stream) ANTES das mensagens,
        //    pois cada mensagem referencia um id de blob já registrado.
        if !self.pending_blobs.is_empty() {
            let mut w = op_packet(op::BATCH_BLOB_STREAM);
            w.put_i32(self.handle);
            // Comprimento alinhado (≥ bytes no wire), depois os blobs crus. Não
            // alinhamos o pacote: o servidor recompõe o alinhamento internamente
            // e a próxima op pode começar em offset não múltiplo de 4.
            w.put_i32(self.blob_stream_len as i32);
            w.put_raw(&self.pending_blobs);
            conn.io().send(&w)?;
            read_response(conn.io())?;
            self.pending_blobs.clear();
            self.blob_stream_len = 0;
        }
        // 0b. Registra BLOBs pré-existentes (op_batch_regblob), também antes das
        //     mensagens. Layout: stmt | id_existente(quad 8B BE) | id_batch(quad).
        if !self.pending_regblobs.is_empty() {
            for (existing_id, batch_id) in std::mem::take(&mut self.pending_regblobs) {
                let mut w = op_packet(op::BATCH_REGBLOB);
                w.put_i32(self.handle);
                w.put_raw(&existing_id.to_be_bytes());
                w.put_raw(&batch_id.to_be_bytes());
                conn.io().send(&w)?;
                read_response(conn.io())?;
            }
        }
        // 1. Envia as mensagens pendentes (op_batch_msg), se houver.
        if self.pending_count > 0 {
            let mut w = op_packet(op::BATCH_MSG);
            w.put_i32(self.handle);
            w.put_i32(self.pending_count as i32);
            w.put_raw(&self.pending);
            w.align();
            conn.io().send(&w)?;
            read_response(conn.io())?;
            self.pending.clear();
            self.pending_count = 0;
        }

        // 2. Executa o lote (op_batch_exec) e lê o estado de conclusão.
        let mut w = op_packet(op::BATCH_EXEC);
        w.put_i32(self.handle);
        w.put_i32(tx.handle());
        conn.io().send(&w)?;
        read_batch_cs(conn)
    }

    /// Descarta as mensagens acumuladas que ainda não foram executadas
    /// (`op_batch_cancel`). Não afeta linhas já executadas em chamadas anteriores.
    pub fn cancel(&mut self, conn: &mut Connection) -> Result<()> {
        self.pending.clear();
        self.pending_count = 0;
        self.pending_blobs.clear();
        self.blob_stream_len = 0;
        self.pending_regblobs.clear();
        let mut w = op_packet(op::BATCH_CANCEL);
        w.put_i32(self.handle);
        conn.io().send(&w)?;
        read_response(conn.io())?;
        Ok(())
    }

    /// Libera o batch e a instrução preparada no servidor (`op_batch_rls` +
    /// `op_free_statement` com `DSQL_drop`), consumindo o handle.
    pub fn close(mut self, conn: &mut Connection) -> Result<()> {
        self.closed = true;
        let mut w = op_packet(op::BATCH_RLS);
        w.put_i32(self.handle);
        conn.io().send(&w)?;
        read_response(conn.io())?;

        let mut w = op_packet(op::FREE_STATEMENT);
        w.put_i32(self.handle);
        w.put_i32(free::DROP);
        conn.io().send(&w)?;
        read_response(conn.io())?;
        Ok(())
    }
}

impl Drop for Batch {
    fn drop(&mut self) {
        if !self.closed {
            crate::warn_unclosed("Batch", self.handle);
        }
    }
}

/// Opções de criação de um [`Batch`] (clumplets do `op_batch_create`).
#[derive(Debug, Clone, Copy)]
pub struct BatchOptions {
    /// Se `true`, o servidor CONTINUA após uma linha falhar, executando as demais e
    /// reportando o erro de cada uma em [`BatchResult::errors`]. Para isso ele
    /// bracketa cada linha num savepoint interno — o que tem **custo por linha**.
    ///
    /// Se `false` (**padrão**), o lote PARA na primeira linha que falha (fail-fast):
    /// é bem mais rápido (sem savepoint por linha) e é o que se quer quando a
    /// transação é "tudo ou nada" e qualquer erro aborta a operação inteira.
    pub multierror: bool,
    /// Se `true` (**padrão**), o servidor reporta a contagem de linhas afetadas por
    /// mensagem em [`BatchResult::update_counts`] (e portanto [`BatchResult::total_affected`]).
    pub record_counts: bool,
}

impl Default for BatchOptions {
    fn default() -> Self {
        // Fail-fast por padrão: a maioria dos usos de DML em lote quer abortar no
        // primeiro erro, e o multierror (savepoint por linha) só compensa quando o
        // chamador realmente vai inspecionar o erro de cada linha.
        BatchOptions {
            multierror: false,
            record_counts: true,
        }
    }
}

impl BatchOptions {
    /// Opções padrão (fail-fast, com contagens por linha).
    pub fn new() -> Self {
        Self::default()
    }

    /// Liga/desliga o modo multierro (continuar após erros por linha).
    pub fn multierror(mut self, on: bool) -> Self {
        self.multierror = on;
        self
    }

    /// Liga/desliga o reporte de contagens de linhas afetadas por mensagem.
    pub fn record_counts(mut self, on: bool) -> Self {
        self.record_counts = on;
        self
    }
}

impl Connection {
    /// Prepara uma instrução e cria um lote (batch) sobre ela com as opções padrão
    /// ([`BatchOptions::default`]: fail-fast, com contagens por linha). A instrução
    /// deve ter parâmetros (`?`) — cada [`Batch::add`] fornece uma linha de valores.
    ///
    /// Para continuar após erros por linha e coletá-los todos, use
    /// [`Self::create_batch_with`] com `BatchOptions::new().multierror(true)`.
    pub fn create_batch(&mut self, tx: &Transaction, sql: &str) -> Result<Batch> {
        self.create_batch_with(tx, sql, BatchOptions::default())
    }

    /// Como [`Self::create_batch`], mas com [`BatchOptions`] explícitas.
    pub fn create_batch_with(
        &mut self,
        tx: &Transaction,
        sql: &str,
        opts: BatchOptions,
    ) -> Result<Batch> {
        let mut stmt = self.prepare(tx, sql)?;
        let handle = stmt.handle();
        let params: Vec<ColumnMeta> = stmt.params().to_vec();
        // O handle passa a viver no Batch (liberado por Batch::close), então
        // marcamos como transferido para não disparar o aviso de Drop e soltamos
        // o wrapper aqui (libera só memória).
        stmt.forget_handle();
        drop(stmt);

        let blr = message_blr(&params);
        let msglen = message_buffer_len(&params);

        // Se houver coluna BLOB, ativa a política STREAM: os blobs são enviados
        // em `op_batch_blob_stream` e a linha referencia o id (ver `add_blob`).
        let blob_stream = params
            .iter()
            .any(|c| sql_type::base(c.sql_type) == sql_type::BLOB);

        // Buffer de parâmetros do batch: byte de versão (1) seguido de clumplets
        // com comprimento LE de 4 bytes. Os tags são opcionais (ver `BatchOptions`).
        let mut pb = ParameterBuffer::new(1);
        if opts.record_counts {
            pb.bytes_be_len4(batch_tag::RECORD_COUNTS, &1u32.to_le_bytes());
        }
        if opts.multierror {
            pb.bytes_be_len4(batch_tag::MULTIERROR, &1u32.to_le_bytes());
        }
        if blob_stream {
            pb.bytes_be_len4(
                batch_tag::BLOB_POLICY,
                &(blob_policy::STREAM as u32).to_le_bytes(),
            );
        }

        let mut w = op_packet(op::BATCH_CREATE);
        w.put_i32(handle);
        w.put_bytes(&blr); // cstring: len(4) + blr + pad
        w.put_i32(msglen as i32);
        w.put_bytes(pb.as_slice()); // cstring: len(4) + pb + pad
        self.io().send(&w)?;
        read_response(self.io())?;

        Ok(Batch {
            handle,
            params,
            pending: Vec::new(),
            pending_count: 0,
            pending_blobs: Vec::new(),
            blob_stream_len: 0,
            pending_regblobs: Vec::new(),
            next_blob_id: 1,
            blob_stream,
            charset: self.charset(),
            default_bpb: None,
            blob_segmented: false,
            closed: false,
        })
    }
}

/// Resultado da execução de um lote: o estado de conclusão por mensagem.
#[derive(Debug, Clone, Default)]
pub struct BatchResult {
    /// Total de mensagens processadas nesta execução.
    pub total: u32,
    /// Contagem de linhas afetadas por mensagem, na ordem em que foram
    /// adicionadas. `>= 0` é o número de linhas; [`batch_cs::EXECUTE_FAILED`]
    /// (−1) marca uma mensagem que falhou; [`batch_cs::SUCCESS_NO_INFO`] (−2)
    /// indica sucesso sem contagem reportada.
    pub update_counts: Vec<i32>,
    /// Erros detalhados por mensagem (índice da mensagem + erro do servidor).
    pub errors: Vec<BatchError>,
}

impl BatchResult {
    /// Verdadeiro se nenhuma mensagem falhou.
    pub fn all_succeeded(&self) -> bool {
        self.errors.is_empty() && !self.update_counts.contains(&batch_cs::EXECUTE_FAILED)
    }

    /// Soma das linhas afetadas pelas mensagens bem-sucedidas (ignora as que
    /// falharam ou não reportaram contagem).
    pub fn total_affected(&self) -> u64 {
        self.update_counts
            .iter()
            .filter(|&&c| c >= 0)
            .map(|&c| c as u64)
            .sum()
    }
}

/// Um erro detalhado de uma mensagem específica do lote.
#[derive(Debug, Clone)]
pub struct BatchError {
    /// Índice (base zero) da mensagem que falhou, na ordem de adição.
    pub message_index: u32,
    /// O erro reportado pelo servidor para essa mensagem.
    pub error: DatabaseError,
}

/// Lê a resposta `op_batch_cs` de um `op_batch_exec`.
///
/// Layout (confirmado por captura, inclusive com erros forçados):
/// `op | stmt | reccount | updates | vectors | errors |`
/// `updates×i32 (contagens) | vectors×(pos u32 + status vector) | errors×u32`.
fn read_batch_cs(conn: &mut Connection) -> Result<BatchResult> {
    let code = read_op(conn.io())?;
    if code == op::RESPONSE {
        // Falha global (não por linha) veio como op_response.
        read_response_body(conn.io())?.into_result()?;
        return Err(Error::protocol(
            "op_batch_exec retornou op_response sem erro",
        ));
    }
    if code != op::BATCH_CS {
        return Err(Error::protocol(format!(
            "esperava op_batch_cs, veio {} ({code})",
            op_name(code)
        )));
    }

    let _stmt = conn.io().read_i32()?;
    let reccount = conn.io().read_i32()? as u32;
    let updates = conn.io().read_i32()? as u32;
    let vectors = conn.io().read_i32()? as u32;
    let errors = conn.io().read_i32()? as u32;

    let mut update_counts = Vec::with_capacity(updates as usize);
    for _ in 0..updates {
        update_counts.push(conn.io().read_i32()?);
    }

    let mut batch_errors = Vec::with_capacity(vectors as usize);
    for _ in 0..vectors {
        let pos = conn.io().read_i32()? as u32;
        let status = read_status_vector(conn.io())?;
        batch_errors.push(BatchError {
            message_index: pos,
            error: DatabaseError::new(status),
        });
    }
    // Lista simples de posições com erro (quando os detalhes não são pedidos).
    for _ in 0..errors {
        let pos = conn.io().read_i32()? as u32;
        if !batch_errors.iter().any(|e| e.message_index == pos) {
            let empty = StatusVector {
                args: Vec::new(),
                sql_state: None,
            };
            batch_errors.push(BatchError {
                message_index: pos,
                error: DatabaseError::new(empty),
            });
        }
    }

    Ok(BatchResult {
        total: reccount,
        update_counts,
        errors: batch_errors,
    })
}