firebird_wire/batch.rs
1//! DML em lote (batch) — o recurso "principal" de array DML do Firebird 4+.
2//!
3//! Um [`Batch`] insere/atualiza/exclui muitas linhas com uma única instrução
4//! preparada, acumulando mensagens no cliente e enviando-as ao servidor em uma
5//! ida só. É muito mais rápido que executar a instrução linha a linha.
6//!
7//! ```text
8//! let mut batch = conn.create_batch(&tx, "INSERT INTO t (a, b) VALUES (?, ?)")?;
9//! batch.add(&[Value::Int(1), Value::Text("um".into())])?;
10//! batch.add(&[Value::Int(2), Value::Text("dois".into())])?;
11//! let result = batch.execute(&mut conn, &tx)?; // envia + executa
12//! println!("{} linhas afetadas no total", result.total_affected());
13//! batch.close(&mut conn)?; // libera no servidor
14//! ```
15//!
16//! # Protocolo (FB4+, op codes 99–103)
17//!
18//! Descoberto por captura de um cliente C usando a interface OO `IBatch`:
19//! 1. `op_batch_create` (99): `stmt | blr(cstring) | msglen(u32) | pb(cstring)`.
20//! O BLR descreve o formato da mensagem (igual ao `in_blr` de `op_execute`);
21//! `msglen` é o tamanho do buffer de mensagem do lado do cliente.
22//! 2. `op_batch_msg` (100): `stmt | count(u32) | mensagens`, cada mensagem no
23//! mesmo formato compacto (bitmap de nulos + valores XDR) usado em `op_execute`.
24//! 3. `op_batch_exec` (101): `stmt | transaction`. Responde com `op_batch_cs`.
25//! 4. `op_batch_cs` (103): estado de conclusão (contagens por linha + erros).
26//! 5. `op_batch_rls` (102): libera o batch no servidor.
27//!
28//! O cliente C agrupa create+msg e usa `op_batch_sync` (110) para drenar as
29//! respostas adiadas; nós, sendo síncronos, lemos a resposta de cada op na hora.
30//!
31//! # BLOBs em lote (política STREAM, `op_batch_blob_stream` 105)
32//!
33//! Quando a instrução tem coluna BLOB, [`Connection::create_batch`] ativa a
34//! política `BLOB_STREAM` no PB (`TAG_BLOB_POLICY` = 3). O chamador então usa
35//! [`Batch::add_blob`] para registrar cada blob e recebe um id (quad) para pôr
36//! no campo da linha. Em [`Batch::execute`], antes das mensagens, enviamos:
37//!
38//! `op_batch_blob_stream` (105): `stmt | length(u32) | stream`. O `stream` é a
39//! concatenação CRUA (sem padding) dos blobs, cada um `id(quad 8B BE) | size(4B
40//! BE) | bpb_size(4B BE) | bpb | dados`. O `length` NÃO é o número de bytes no
41//! wire: é o tamanho do buffer que o servidor aloca — a soma de `align4(16 +
42//! bpb + dados)` de cada blob — e deve ser múltiplo de 4 (o servidor rejeita
43//! caso contrário). Ao percorrer o stream (ver `xdr_blob_stream` no servidor),
44//! ele avança o ponteiro do buffer com alinhamento de 4 SEM consumir bytes do
45//! wire para o padding; o laço termina quando o que resta é menor que um
46//! cabeçalho (16 B) ou chega a zero. Por isso o wire carrega menos bytes que
47//! `length`, e a próxima op pode começar em offset não múltiplo de 4.
48//! Confirmado por captura do `11.batch.cpp` (conteúdo 30 B → length 32; 33 B →
49//! 36) e pelo código do servidor. Os blobs vão **antes** das mensagens porque a
50//! linha referencia o id já registrado.
51//!
52//! `op_batch_regblob` (104): `stmt | id_existente(quad 8B BE) | id_batch(quad)`.
53//! Mapeia um BLOB já gravado fora do batch ([`Connection::write_blob`]) a um id
54//! local, evitando reenviar os dados. Ver [`Batch::register_blob`].
55//!
56//! `op_batch_set_bpb` (106): `stmt | bpb(cstring)`. Define o BPB padrão dos
57//! blobs do batch; o servidor lê o `isc_bpb_type` para marcar os blobs como
58//! segmentados ou stream. Ver [`Batch::set_default_bpb`] / [`Batch::set_segmented`].
59//! Em modo segmentado, cada segmento no stream é um `u32` big-endian com o
60//! comprimento seguido dos bytes, SEM padding; já o campo `size` do blob segue a
61//! contabilidade do buffer do servidor (cabeçalho de 2 bytes alinhado a 2), ou
62//! seja `align2(2 + len)`. Capturado da Parte 3 do `11.batch.cpp`.
63
64use crate::blr::message_blr;
65use crate::connection::Connection;
66use crate::error::{DatabaseError, Error, Result, StatusVector};
67use crate::message::{encode_row_into, message_buffer_len};
68use crate::transaction::Transaction;
69use crate::value::{ColumnMeta, Value};
70use crate::wire::consts::*;
71use crate::wire::response::{read_op, read_response, read_response_body, read_status_vector};
72use crate::wire::stream::{op_name, op_packet};
73use crate::wire::xdr::ParameterBuffer;
74
75/// Um lote de mensagens vinculado a uma instrução preparada no servidor.
76///
77/// Crie-o com [`Connection::create_batch`], acumule linhas com [`Batch::add`] e
78/// envie com [`Batch::execute`]. O batch pode ser reutilizado (adicione mais e
79/// execute de novo). Ao terminar, chame [`Batch::close`] para liberá-lo.
80pub struct Batch {
81 handle: i32,
82 params: Vec<ColumnMeta>,
83 /// Mensagens já codificadas mas ainda não enviadas/executadas.
84 pending: Vec<u8>,
85 pending_count: u32,
86 /// Stream de BLOBs acumulado (cabeçalho + dados de cada blob), ainda não
87 /// enviado. Só usado em batches com coluna BLOB (política STREAM). Os blobs
88 /// vão concatenados SEM padding no wire (o servidor reconstrói o alinhamento
89 /// no buffer dele).
90 pending_blobs: Vec<u8>,
91 /// Comprimento *alinhado* do stream de blobs (campo `p_batch_blob_data` do
92 /// op_batch_blob_stream): a soma de `align4(16 + dados)` de cada blob. É o
93 /// tamanho do buffer que o servidor aloca, e deve ser múltiplo de
94 /// `BLOB_ALIGN` (o servidor rejeita caso contrário). Sempre `>=` o número de
95 /// bytes efetivamente enviados.
96 blob_stream_len: usize,
97 /// Registros pendentes de BLOBs pré-existentes (`op_batch_regblob`): cada par
98 /// é `(id_existente, id_no_batch)`. Enviados em [`Batch::execute`] antes das
99 /// mensagens, junto com o stream de blobs.
100 pending_regblobs: Vec<(u64, u64)>,
101 /// Próximo id (quad) a atribuir em [`Batch::add_blob`]/[`Batch::register_blob`].
102 /// Começa em 1; o id 0 é reservado (quad nulo).
103 next_blob_id: u64,
104 /// Verdadeiro se a instrução tem ao menos uma coluna BLOB (política STREAM
105 /// ativada na criação).
106 blob_stream: bool,
107 /// Charset da conexão, usado para codificar parâmetros de texto em
108 /// [`Batch::add`] (capturado na criação, já que `add` não recebe a conexão).
109 charset: crate::charset::Charset,
110 /// BPB padrão a aplicar aos blobs do batch (`op_batch_set_bpb`), se definido.
111 /// Enviado em [`Batch::execute`] antes do stream de blobs.
112 default_bpb: Option<Vec<u8>>,
113 /// Verdadeiro quando o BPB padrão marca os blobs como segmentados; nesse caso
114 /// [`Batch::add_blob`] enquadra os dados em segmentos.
115 blob_segmented: bool,
116 closed: bool,
117}
118
119/// Alinhamento do cabeçalho de segmento em um blob segmentado
120/// (`IBatch::BLOB_SEGHDR_ALIGN`). Como cada `add_blob` segmentado começa numa
121/// fronteira de 4 bytes (já múltiplo de 2), não há padding a inserir.
122const BLOB_SEGHDR_ALIGN: usize = 2;
123const _: () = assert!(BLOB_ALIGN.is_multiple_of(BLOB_SEGHDR_ALIGN));
124
125/// Arredonda `n` para cima até um múltiplo de `align` (potência de 2).
126#[inline]
127fn align_up(n: usize, align: usize) -> usize {
128 (n + align - 1) & !(align - 1)
129}
130
131/// Verdadeiro se o BPB marca o blob como segmentado: clumplet `isc_bpb_type`(3)
132/// com valor `isc_bpb_type_segmented`(0). Formato do BPB: byte de versão seguido
133/// de clumplets `tag(1) | len(1) | valor(len)`.
134fn bpb_is_segmented(bpb: &[u8]) -> bool {
135 let mut i = 1; // pula o byte de versão
136 while i + 1 < bpb.len() {
137 let tag = bpb[i];
138 let len = bpb[i + 1] as usize;
139 let val = &bpb[i + 2..(i + 2 + len).min(bpb.len())];
140 if tag == bpb::TYPE {
141 return val.first() == Some(&bpb::TYPE_SEGMENTED);
142 }
143 i += 2 + len;
144 }
145 false
146}
147
148/// Alinhamento (em bytes) de cada blob dentro do stream de `op_batch_blob_stream`.
149/// O servidor FB5 usa 4 (confirmado por captura: conteúdo de 30 B → campo de
150/// comprimento 32; de 33 B → 36). É o valor que `IBatch::getBlobAlignment`
151/// retorna para o protocolo remoto.
152const BLOB_ALIGN: usize = 4;
153
154impl Batch {
155 /// O handle da instrução/batch no servidor.
156 pub fn handle(&self) -> i32 {
157 self.handle
158 }
159
160 /// Quantas mensagens estão acumuladas mas ainda não executadas.
161 pub fn pending(&self) -> u32 {
162 self.pending_count
163 }
164
165 /// Tamanho (alinhado) do stream de BLOBs acumulado mas ainda não enviado, em
166 /// bytes. Zera após [`Batch::execute`]/[`Batch::cancel`]. Útil para o chamador
167 /// drenar o lote por *volume de blob* (e não só por contagem de linhas) e assim
168 /// não estourar o buffer de batch do servidor (`TAG_BUFFER_BYTES_SIZE`).
169 pub fn blob_stream_len(&self) -> usize {
170 self.blob_stream_len
171 }
172
173 /// Os metadados dos parâmetros de entrada (a forma de cada linha esperada).
174 pub fn params(&self) -> &[ColumnMeta] {
175 &self.params
176 }
177
178 /// Registra um BLOB no stream do lote e devolve seu id (quad), que deve ser
179 /// posto no campo BLOB da linha correspondente como [`Value::Blob`].
180 ///
181 /// Só funciona em batches cuja instrução tem coluna BLOB (a política STREAM é
182 /// ativada automaticamente em [`Connection::create_batch`] nesse caso). Como
183 /// [`Self::add`], apenas acumula na memória; os blobs vão à rede em
184 /// [`Self::execute`], **antes** das mensagens que os referenciam.
185 ///
186 /// ```text
187 /// let id = batch.add_blob(b"conteudo do blob")?;
188 /// batch.add(&[Value::Text("BAT31".into()), Value::Blob(id)])?;
189 /// ```
190 pub fn add_blob(&mut self, data: &[u8]) -> Result<u64> {
191 if !self.blob_stream {
192 return Err(Error::protocol(
193 "add_blob exige uma coluna BLOB na instrução do batch",
194 ));
195 }
196 let id = self.next_blob_id;
197 self.next_blob_id += 1;
198 // Cabeçalho do blob, tudo big-endian: id(quad 8B) | size(4B) | bpb_size(4B
199 // = 0, pois o BPB é do batch). Os blobs vão concatenados SEM padding; o
200 // servidor reconstrói o alinhamento no buffer dele.
201 self.pending_blobs.extend_from_slice(&id.to_be_bytes());
202 if self.blob_segmented {
203 // Blob segmentado (um único segmento aqui). NO WIRE: cada segmento é
204 // `u32` big-endian com o comprimento + os bytes, sem padding. Mas o
205 // campo `size` (e o comprimento do stream) seguem a contabilidade do
206 // *buffer* do servidor, que usa cabeçalho de 2 bytes alinhado a
207 // `BLOB_SEGHDR_ALIGN`: `size = align2(2 + len)`. Ver `xdr_blob_stream`.
208 let size_field = align_up(2 + data.len(), BLOB_SEGHDR_ALIGN);
209 self.pending_blobs
210 .extend_from_slice(&(size_field as u32).to_be_bytes());
211 self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
212 self.pending_blobs
213 .extend_from_slice(&(data.len() as u32).to_be_bytes());
214 self.pending_blobs.extend_from_slice(data);
215 self.blob_stream_len += align_up(16 + size_field, BLOB_ALIGN);
216 } else {
217 // Blob de stream (não segmentado): dados crus.
218 self.pending_blobs
219 .extend_from_slice(&(data.len() as u32).to_be_bytes());
220 self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
221 self.pending_blobs.extend_from_slice(data);
222 self.blob_stream_len += align_up(16 + data.len(), BLOB_ALIGN);
223 }
224 Ok(id)
225 }
226
227 /// Define o BPB (blob parameter buffer) padrão dos blobs do batch
228 /// (`op_batch_set_bpb`). O uso típico é marcar os blobs como **segmentados**
229 /// (`isc_bpb_type = isc_bpb_type_segmented`): nesse caso [`Self::add_blob`]
230 /// passa a enquadrar cada blob como um segmento. Deve ser chamado antes de
231 /// [`Self::add_blob`]; o BPB vai à rede em [`Self::execute`], antes dos blobs.
232 ///
233 /// Conveniência: [`Self::set_segmented`] monta o BPB segmentado para você.
234 pub fn set_default_bpb(&mut self, bpb: Vec<u8>) -> Result<()> {
235 if !self.blob_stream {
236 return Err(Error::protocol(
237 "set_default_bpb exige uma coluna BLOB na instrução do batch",
238 ));
239 }
240 // Detecta o tipo segmentado: clumplet `isc_bpb_type`(3) com valor
241 // `isc_bpb_type_segmented`(0). BPB usa comprimento de 1 byte por clumplet.
242 self.blob_segmented = bpb_is_segmented(&bpb);
243 self.default_bpb = Some(bpb);
244 Ok(())
245 }
246
247 /// Atalho para [`Self::set_default_bpb`] com um BPB que marca os blobs como
248 /// segmentados (`segmentado = true`) ou stream (`false`, o padrão).
249 pub fn set_segmented(&mut self, segmented: bool) -> Result<()> {
250 let kind = if segmented {
251 bpb::TYPE_SEGMENTED
252 } else {
253 bpb::TYPE_STREAM
254 };
255 // BPB no formato que o fbclient envia: versão(1) | tag isc_bpb_type(3) |
256 // len(1) = 4 | valor(4 bytes LE). Capturado do `11.batch.cpp` Parte 3.
257 self.set_default_bpb(vec![BPB_VERSION1, bpb::TYPE, 4, kind, 0, 0, 0])
258 }
259
260 /// Mapeia um BLOB **já existente** (criado fora do batch, p.ex. via
261 /// [`Connection::write_blob`]/[`Connection::create_blob`]) para um id local
262 /// do batch e devolve esse id, que deve ir no campo BLOB da linha como
263 /// [`Value::Blob`]. Útil para reaproveitar BLOBs grandes já gravados sem
264 /// reenviá-los pelo stream (`op_batch_regblob`).
265 ///
266 /// Como [`Self::add_blob`], só vale em batches com coluna BLOB; o registro é
267 /// acumulado e enviado em [`Self::execute`], antes das mensagens.
268 pub fn register_blob(&mut self, existing_id: u64) -> Result<u64> {
269 if !self.blob_stream {
270 return Err(Error::protocol(
271 "register_blob exige uma coluna BLOB na instrução do batch",
272 ));
273 }
274 let batch_id = self.next_blob_id;
275 self.next_blob_id += 1;
276 self.pending_regblobs.push((existing_id, batch_id));
277 Ok(batch_id)
278 }
279
280 /// Adiciona uma linha ao lote. Os valores devem corresponder, em número e
281 /// tipo, aos parâmetros da instrução (ver [`Self::params`]). Apenas acumula
282 /// na memória; nada vai à rede até [`Self::execute`].
283 pub fn add(&mut self, values: &[Value]) -> Result<()> {
284 // Codifica direto no buffer pendente (sem Vec temporário por linha). Em erro,
285 // `encode_row_into` restaura o tamanho de `pending`, então não corrompe o lote.
286 encode_row_into(&mut self.pending, &self.params, values, self.charset)?;
287 self.pending_count += 1;
288 Ok(())
289 }
290
291 /// Envia as mensagens acumuladas e executa o lote, retornando o estado de
292 /// conclusão (contagens por linha e erros por linha). Esvazia o buffer
293 /// pendente; o batch pode então ser reutilizado.
294 pub fn execute(&mut self, conn: &mut Connection, tx: &Transaction) -> Result<BatchResult> {
295 if self.closed {
296 return Err(Error::protocol("batch já foi fechado"));
297 }
298 // 0a. Define o BPB padrão (op_batch_set_bpb), se houver, antes dos blobs.
299 if let Some(bpb) = self.default_bpb.take() {
300 let mut w = op_packet(op::BATCH_SET_BPB);
301 w.put_i32(self.handle);
302 w.put_bytes(&bpb); // cstring: len(4) + bpb + pad
303 conn.io().send(&w)?;
304 read_response(conn.io())?;
305 }
306 // 0b. Envia os BLOBs pendentes (op_batch_blob_stream) ANTES das mensagens,
307 // pois cada mensagem referencia um id de blob já registrado.
308 if !self.pending_blobs.is_empty() {
309 let mut w = op_packet(op::BATCH_BLOB_STREAM);
310 w.put_i32(self.handle);
311 // Comprimento alinhado (≥ bytes no wire), depois os blobs crus. Não
312 // alinhamos o pacote: o servidor recompõe o alinhamento internamente
313 // e a próxima op pode começar em offset não múltiplo de 4.
314 w.put_i32(self.blob_stream_len as i32);
315 w.put_raw(&self.pending_blobs);
316 conn.io().send(&w)?;
317 read_response(conn.io())?;
318 self.pending_blobs.clear();
319 self.blob_stream_len = 0;
320 }
321 // 0b. Registra BLOBs pré-existentes (op_batch_regblob), também antes das
322 // mensagens. Layout: stmt | id_existente(quad 8B BE) | id_batch(quad).
323 if !self.pending_regblobs.is_empty() {
324 for (existing_id, batch_id) in std::mem::take(&mut self.pending_regblobs) {
325 let mut w = op_packet(op::BATCH_REGBLOB);
326 w.put_i32(self.handle);
327 w.put_raw(&existing_id.to_be_bytes());
328 w.put_raw(&batch_id.to_be_bytes());
329 conn.io().send(&w)?;
330 read_response(conn.io())?;
331 }
332 }
333 // 1. Envia as mensagens pendentes (op_batch_msg), se houver.
334 if self.pending_count > 0 {
335 let mut w = op_packet(op::BATCH_MSG);
336 w.put_i32(self.handle);
337 w.put_i32(self.pending_count as i32);
338 w.put_raw(&self.pending);
339 w.align();
340 conn.io().send(&w)?;
341 read_response(conn.io())?;
342 self.pending.clear();
343 self.pending_count = 0;
344 }
345
346 // 2. Executa o lote (op_batch_exec) e lê o estado de conclusão.
347 let mut w = op_packet(op::BATCH_EXEC);
348 w.put_i32(self.handle);
349 w.put_i32(tx.handle());
350 conn.io().send(&w)?;
351 read_batch_cs(conn)
352 }
353
354 /// Descarta as mensagens acumuladas que ainda não foram executadas
355 /// (`op_batch_cancel`). Não afeta linhas já executadas em chamadas anteriores.
356 pub fn cancel(&mut self, conn: &mut Connection) -> Result<()> {
357 self.pending.clear();
358 self.pending_count = 0;
359 self.pending_blobs.clear();
360 self.blob_stream_len = 0;
361 self.pending_regblobs.clear();
362 let mut w = op_packet(op::BATCH_CANCEL);
363 w.put_i32(self.handle);
364 conn.io().send(&w)?;
365 read_response(conn.io())?;
366 Ok(())
367 }
368
369 /// Libera o batch e a instrução preparada no servidor (`op_batch_rls` +
370 /// `op_free_statement` com `DSQL_drop`), consumindo o handle.
371 pub fn close(mut self, conn: &mut Connection) -> Result<()> {
372 self.closed = true;
373 let mut w = op_packet(op::BATCH_RLS);
374 w.put_i32(self.handle);
375 conn.io().send(&w)?;
376 read_response(conn.io())?;
377
378 let mut w = op_packet(op::FREE_STATEMENT);
379 w.put_i32(self.handle);
380 w.put_i32(free::DROP);
381 conn.io().send(&w)?;
382 read_response(conn.io())?;
383 Ok(())
384 }
385}
386
387impl Drop for Batch {
388 fn drop(&mut self) {
389 if !self.closed {
390 crate::warn_unclosed("Batch", self.handle);
391 }
392 }
393}
394
395/// Opções de criação de um [`Batch`] (clumplets do `op_batch_create`).
396#[derive(Debug, Clone, Copy)]
397pub struct BatchOptions {
398 /// Se `true`, o servidor CONTINUA após uma linha falhar, executando as demais e
399 /// reportando o erro de cada uma em [`BatchResult::errors`]. Para isso ele
400 /// bracketa cada linha num savepoint interno — o que tem **custo por linha**.
401 ///
402 /// Se `false` (**padrão**), o lote PARA na primeira linha que falha (fail-fast):
403 /// é bem mais rápido (sem savepoint por linha) e é o que se quer quando a
404 /// transação é "tudo ou nada" e qualquer erro aborta a operação inteira.
405 pub multierror: bool,
406 /// Se `true` (**padrão**), o servidor reporta a contagem de linhas afetadas por
407 /// mensagem em [`BatchResult::update_counts`] (e portanto [`BatchResult::total_affected`]).
408 pub record_counts: bool,
409 /// Tamanho máximo (em bytes) do buffer de batch que o servidor aloca
410 /// (`TAG_BUFFER_BYTES_SIZE`). `None` (**padrão**) deixa o servidor usar o seu
411 /// default. Aumentar permite acumular mais mensagens/blobs por `execute` sem
412 /// estourar o buffer (o estouro aparece como "invalid BLOB ID"); o servidor
413 /// impõe um teto próprio (256 MiB no Firebird).
414 pub buffer_bytes: Option<u32>,
415}
416
417impl Default for BatchOptions {
418 fn default() -> Self {
419 // Fail-fast por padrão: a maioria dos usos de DML em lote quer abortar no
420 // primeiro erro, e o multierror (savepoint por linha) só compensa quando o
421 // chamador realmente vai inspecionar o erro de cada linha.
422 BatchOptions {
423 multierror: false,
424 record_counts: true,
425 buffer_bytes: None,
426 }
427 }
428}
429
430impl BatchOptions {
431 /// Opções padrão (fail-fast, com contagens por linha).
432 pub fn new() -> Self {
433 Self::default()
434 }
435
436 /// Liga/desliga o modo multierro (continuar após erros por linha).
437 pub fn multierror(mut self, on: bool) -> Self {
438 self.multierror = on;
439 self
440 }
441
442 /// Liga/desliga o reporte de contagens de linhas afetadas por mensagem.
443 pub fn record_counts(mut self, on: bool) -> Self {
444 self.record_counts = on;
445 self
446 }
447
448 /// Define o tamanho do buffer de batch do servidor (`TAG_BUFFER_BYTES_SIZE`),
449 /// em bytes. Use para acomodar lotes maiores (mais mensagens/blobs por
450 /// `execute`) sem estourar o buffer. O servidor impõe um teto (256 MiB no
451 /// Firebird); valores acima são limitados por ele.
452 pub fn buffer_bytes(mut self, bytes: u32) -> Self {
453 self.buffer_bytes = Some(bytes);
454 self
455 }
456}
457
458impl Connection {
459 /// Prepara uma instrução e cria um lote (batch) sobre ela com as opções padrão
460 /// ([`BatchOptions::default`]: fail-fast, com contagens por linha). A instrução
461 /// deve ter parâmetros (`?`) — cada [`Batch::add`] fornece uma linha de valores.
462 ///
463 /// Para continuar após erros por linha e coletá-los todos, use
464 /// [`Self::create_batch_with`] com `BatchOptions::new().multierror(true)`.
465 pub fn create_batch(&mut self, tx: &Transaction, sql: &str) -> Result<Batch> {
466 self.create_batch_with(tx, sql, BatchOptions::default())
467 }
468
469 /// Como [`Self::create_batch`], mas com [`BatchOptions`] explícitas.
470 pub fn create_batch_with(
471 &mut self,
472 tx: &Transaction,
473 sql: &str,
474 opts: BatchOptions,
475 ) -> Result<Batch> {
476 let mut stmt = self.prepare(tx, sql)?;
477 let handle = stmt.handle();
478 let params: Vec<ColumnMeta> = stmt.params().to_vec();
479 // O handle passa a viver no Batch (liberado por Batch::close), então
480 // marcamos como transferido para não disparar o aviso de Drop e soltamos
481 // o wrapper aqui (libera só memória).
482 stmt.forget_handle();
483 drop(stmt);
484
485 let blr = message_blr(¶ms);
486 let msglen = message_buffer_len(¶ms);
487
488 // Se houver coluna BLOB, ativa a política STREAM: os blobs são enviados
489 // em `op_batch_blob_stream` e a linha referencia o id (ver `add_blob`).
490 let blob_stream = params
491 .iter()
492 .any(|c| sql_type::base(c.sql_type) == sql_type::BLOB);
493
494 // Buffer de parâmetros do batch: byte de versão (1) seguido de clumplets
495 // com comprimento LE de 4 bytes. Os tags são opcionais (ver `BatchOptions`).
496 let mut pb = ParameterBuffer::new(1);
497 if opts.record_counts {
498 pb.bytes_be_len4(batch_tag::RECORD_COUNTS, &1u32.to_le_bytes());
499 }
500 if opts.multierror {
501 pb.bytes_be_len4(batch_tag::MULTIERROR, &1u32.to_le_bytes());
502 }
503 if let Some(bytes) = opts.buffer_bytes {
504 pb.bytes_be_len4(batch_tag::BUFFER_BYTES_SIZE, &bytes.to_le_bytes());
505 }
506 if blob_stream {
507 pb.bytes_be_len4(
508 batch_tag::BLOB_POLICY,
509 &(blob_policy::STREAM as u32).to_le_bytes(),
510 );
511 }
512
513 let mut w = op_packet(op::BATCH_CREATE);
514 w.put_i32(handle);
515 w.put_bytes(&blr); // cstring: len(4) + blr + pad
516 w.put_i32(msglen as i32);
517 w.put_bytes(pb.as_slice()); // cstring: len(4) + pb + pad
518 self.io().send(&w)?;
519 read_response(self.io())?;
520
521 Ok(Batch {
522 handle,
523 params,
524 pending: Vec::new(),
525 pending_count: 0,
526 pending_blobs: Vec::new(),
527 blob_stream_len: 0,
528 pending_regblobs: Vec::new(),
529 next_blob_id: 1,
530 blob_stream,
531 charset: self.charset(),
532 default_bpb: None,
533 blob_segmented: false,
534 closed: false,
535 })
536 }
537}
538
539/// Resultado da execução de um lote: o estado de conclusão por mensagem.
540#[derive(Debug, Clone, Default)]
541pub struct BatchResult {
542 /// Total de mensagens processadas nesta execução.
543 pub total: u32,
544 /// Contagem de linhas afetadas por mensagem, na ordem em que foram
545 /// adicionadas. `>= 0` é o número de linhas; [`batch_cs::EXECUTE_FAILED`]
546 /// (−1) marca uma mensagem que falhou; [`batch_cs::SUCCESS_NO_INFO`] (−2)
547 /// indica sucesso sem contagem reportada.
548 pub update_counts: Vec<i32>,
549 /// Erros detalhados por mensagem (índice da mensagem + erro do servidor).
550 pub errors: Vec<BatchError>,
551}
552
553impl BatchResult {
554 /// Verdadeiro se nenhuma mensagem falhou.
555 pub fn all_succeeded(&self) -> bool {
556 self.errors.is_empty() && !self.update_counts.contains(&batch_cs::EXECUTE_FAILED)
557 }
558
559 /// Soma das linhas afetadas pelas mensagens bem-sucedidas (ignora as que
560 /// falharam ou não reportaram contagem).
561 pub fn total_affected(&self) -> u64 {
562 self.update_counts
563 .iter()
564 .filter(|&&c| c >= 0)
565 .map(|&c| c as u64)
566 .sum()
567 }
568}
569
570/// Um erro detalhado de uma mensagem específica do lote.
571#[derive(Debug, Clone)]
572pub struct BatchError {
573 /// Índice (base zero) da mensagem que falhou, na ordem de adição.
574 pub message_index: u32,
575 /// O erro reportado pelo servidor para essa mensagem.
576 pub error: DatabaseError,
577}
578
579/// Lê a resposta `op_batch_cs` de um `op_batch_exec`.
580///
581/// Layout (confirmado por captura, inclusive com erros forçados):
582/// `op | stmt | reccount | updates | vectors | errors |`
583/// `updates×i32 (contagens) | vectors×(pos u32 + status vector) | errors×u32`.
584fn read_batch_cs(conn: &mut Connection) -> Result<BatchResult> {
585 let code = read_op(conn.io())?;
586 if code == op::RESPONSE {
587 // Falha global (não por linha) veio como op_response.
588 read_response_body(conn.io())?.into_result()?;
589 return Err(Error::protocol(
590 "op_batch_exec retornou op_response sem erro",
591 ));
592 }
593 if code != op::BATCH_CS {
594 return Err(Error::protocol(format!(
595 "esperava op_batch_cs, veio {} ({code})",
596 op_name(code)
597 )));
598 }
599
600 let _stmt = conn.io().read_i32()?;
601 let reccount = conn.io().read_i32()? as u32;
602 let updates = conn.io().read_i32()? as u32;
603 let vectors = conn.io().read_i32()? as u32;
604 let errors = conn.io().read_i32()? as u32;
605
606 let mut update_counts = Vec::with_capacity(updates as usize);
607 for _ in 0..updates {
608 update_counts.push(conn.io().read_i32()?);
609 }
610
611 let mut batch_errors = Vec::with_capacity(vectors as usize);
612 for _ in 0..vectors {
613 let pos = conn.io().read_i32()? as u32;
614 let status = read_status_vector(conn.io())?;
615 batch_errors.push(BatchError {
616 message_index: pos,
617 error: DatabaseError::new(status),
618 });
619 }
620 // Lista simples de posições com erro (quando os detalhes não são pedidos).
621 for _ in 0..errors {
622 let pos = conn.io().read_i32()? as u32;
623 if !batch_errors.iter().any(|e| e.message_index == pos) {
624 let empty = StatusVector {
625 args: Vec::new(),
626 sql_state: None,
627 };
628 batch_errors.push(BatchError {
629 message_index: pos,
630 error: DatabaseError::new(empty),
631 });
632 }
633 }
634
635 Ok(BatchResult {
636 total: reccount,
637 update_counts,
638 errors: batch_errors,
639 })
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645
646 /// Cria um `Batch` "de mentira" (sem conexão/servidor) só para exercitar a
647 /// contabilidade EM MEMÓRIA de `add_blob`. `closed: true` evita o aviso de Drop.
648 fn fake_blob_batch() -> Batch {
649 Batch {
650 handle: 0,
651 params: Vec::new(),
652 pending: Vec::new(),
653 pending_count: 0,
654 pending_blobs: Vec::new(),
655 blob_stream_len: 0,
656 pending_regblobs: Vec::new(),
657 next_blob_id: 1,
658 blob_stream: true,
659 charset: crate::charset::Charset::Utf8,
660 default_bpb: None,
661 blob_segmented: false,
662 closed: true,
663 }
664 }
665
666 /// Bytes que cada blob (não-segmentado) ocupa no buffer do servidor:
667 /// `align4(16 + dados)` (cabeçalho de 16 B + dados, alinhado a `BLOB_ALIGN`).
668 fn entry_len(data_len: usize) -> usize {
669 align_up(16 + data_len, BLOB_ALIGN)
670 }
671
672 #[test]
673 fn add_blob_atribui_ids_sequenciais_a_partir_de_1() {
674 let mut b = fake_blob_batch();
675 assert_eq!(b.add_blob(b"a").unwrap(), 1);
676 assert_eq!(b.add_blob(b"bb").unwrap(), 2);
677 assert_eq!(b.add_blob(b"ccc").unwrap(), 3);
678 }
679
680 #[test]
681 fn add_blob_exige_coluna_blob() {
682 let mut b = fake_blob_batch();
683 b.blob_stream = false;
684 assert!(b.add_blob(b"x").is_err());
685 }
686
687 /// O invariante do qual o flush por bytes depende: `blob_stream_len()` é a soma
688 /// de `align4(16 + dados)` de cada blob — inclusive (e principalmente) para
689 /// tamanhos NÃO múltiplos de 4, que arredondam pra cima.
690 #[test]
691 fn blob_stream_len_acumula_tamanhos_alinhados() {
692 let mut b = fake_blob_batch();
693 for data in [&b"x"[..], &b"yy"[..], &b"zzz"[..], &b"wwww"[..]] {
694 b.add_blob(data).unwrap();
695 }
696 let esperado = entry_len(1) + entry_len(2) + entry_len(3) + entry_len(4);
697 assert_eq!(b.blob_stream_len(), esperado);
698 // A soma é sempre múltipla de BLOB_ALIGN (cada parcela é).
699 assert_eq!(b.blob_stream_len() % BLOB_ALIGN, 0);
700 }
701
702 /// Casos pontuais do tamanho declarado para um único blob (cabeçalho 16 B +
703 /// dados, alinhado a 4): 30 -> align4(46)=48; 33 -> align4(49)=52.
704 #[test]
705 fn blob_stream_len_de_um_unico_blob() {
706 let mut b = fake_blob_batch();
707 b.add_blob(&[0u8; 30]).unwrap();
708 assert_eq!(b.blob_stream_len(), 48);
709
710 let mut b = fake_blob_batch();
711 b.add_blob(&[0u8; 33]).unwrap();
712 assert_eq!(b.blob_stream_len(), 52);
713 }
714
715 /// `BatchOptions` por padrão não fixa o buffer (deixa o default do servidor); o
716 /// builder `buffer_bytes` registra o limite a enviar como `TAG_BUFFER_BYTES_SIZE`.
717 #[test]
718 fn batch_options_buffer_bytes() {
719 assert_eq!(BatchOptions::new().buffer_bytes, None);
720 let opts = BatchOptions::new().buffer_bytes(16 * 1024 * 1024);
721 assert_eq!(opts.buffer_bytes, Some(16 * 1024 * 1024));
722 }
723}