firebird_wire/batch.rs
1//! DML em lote (batch) — o recurso "principal" de array DML do Firebird 4+.
2//!
3//! Um [`Batch`] insere/atualiza/exclui muitas linhas com uma única instrução
4//! preparada, acumulando mensagens no cliente e enviando-as ao servidor em uma
5//! ida só. É muito mais rápido que executar a instrução linha a linha.
6//!
7//! ```text
8//! let mut batch = conn.create_batch(&tx, "INSERT INTO t (a, b) VALUES (?, ?)")?;
9//! batch.add(&[Value::Int(1), Value::Text("um".into())])?;
10//! batch.add(&[Value::Int(2), Value::Text("dois".into())])?;
11//! let result = batch.execute(&mut conn, &tx)?; // envia + executa
12//! println!("{} linhas afetadas no total", result.total_affected());
13//! batch.close(&mut conn)?; // libera no servidor
14//! ```
15//!
16//! # Protocolo (FB4+, op codes 99–103)
17//!
18//! Descoberto por captura de um cliente C usando a interface OO `IBatch`:
19//! 1. `op_batch_create` (99): `stmt | blr(cstring) | msglen(u32) | pb(cstring)`.
20//! O BLR descreve o formato da mensagem (igual ao `in_blr` de `op_execute`);
21//! `msglen` é o tamanho do buffer de mensagem do lado do cliente.
22//! 2. `op_batch_msg` (100): `stmt | count(u32) | mensagens`, cada mensagem no
23//! mesmo formato compacto (bitmap de nulos + valores XDR) usado em `op_execute`.
24//! 3. `op_batch_exec` (101): `stmt | transaction`. Responde com `op_batch_cs`.
25//! 4. `op_batch_cs` (103): estado de conclusão (contagens por linha + erros).
26//! 5. `op_batch_rls` (102): libera o batch no servidor.
27//!
28//! O cliente C agrupa create+msg e usa `op_batch_sync` (110) para drenar as
29//! respostas adiadas; nós, sendo síncronos, lemos a resposta de cada op na hora.
30//!
31//! # BLOBs em lote (política STREAM, `op_batch_blob_stream` 105)
32//!
33//! Quando a instrução tem coluna BLOB, [`Connection::create_batch`] ativa a
34//! política `BLOB_STREAM` no PB (`TAG_BLOB_POLICY` = 3). O chamador então usa
35//! [`Batch::add_blob`] para registrar cada blob e recebe um id (quad) para pôr
36//! no campo da linha. Em [`Batch::execute`], antes das mensagens, enviamos:
37//!
38//! `op_batch_blob_stream` (105): `stmt | length(u32) | stream`. O `stream` é a
39//! concatenação CRUA (sem padding) dos blobs, cada um `id(quad 8B BE) | size(4B
40//! BE) | bpb_size(4B BE) | bpb | dados`. O `length` NÃO é o número de bytes no
41//! wire: é o tamanho do buffer que o servidor aloca — a soma de `align4(16 +
42//! bpb + dados)` de cada blob — e deve ser múltiplo de 4 (o servidor rejeita
43//! caso contrário). Ao percorrer o stream (ver `xdr_blob_stream` no servidor),
44//! ele avança o ponteiro do buffer com alinhamento de 4 SEM consumir bytes do
45//! wire para o padding; o laço termina quando o que resta é menor que um
46//! cabeçalho (16 B) ou chega a zero. Por isso o wire carrega menos bytes que
47//! `length`, e a próxima op pode começar em offset não múltiplo de 4.
48//! Confirmado por captura do `11.batch.cpp` (conteúdo 30 B → length 32; 33 B →
49//! 36) e pelo código do servidor. Os blobs vão **antes** das mensagens porque a
50//! linha referencia o id já registrado.
51//!
52//! `op_batch_regblob` (104): `stmt | id_existente(quad 8B BE) | id_batch(quad)`.
53//! Mapeia um BLOB já gravado fora do batch ([`Connection::write_blob`]) a um id
54//! local, evitando reenviar os dados. Ver [`Batch::register_blob`].
55//!
56//! `op_batch_set_bpb` (106): `stmt | bpb(cstring)`. Define o BPB padrão dos
57//! blobs do batch; o servidor lê o `isc_bpb_type` para marcar os blobs como
58//! segmentados ou stream. Ver [`Batch::set_default_bpb`] / [`Batch::set_segmented`].
59//! Em modo segmentado, cada segmento no stream é um `u32` big-endian com o
60//! comprimento seguido dos bytes, SEM padding; já o campo `size` do blob segue a
61//! contabilidade do buffer do servidor (cabeçalho de 2 bytes alinhado a 2), ou
62//! seja `align2(2 + len)`. Capturado da Parte 3 do `11.batch.cpp`.
63
64use crate::blr::message_blr;
65use crate::connection::Connection;
66use crate::error::{DatabaseError, Error, Result, StatusVector};
67use crate::message::{encode_row_into, message_buffer_len};
68use crate::transaction::Transaction;
69use crate::value::{ColumnMeta, Value};
70use crate::wire::consts::*;
71use crate::wire::response::{read_op, read_response, read_response_body, read_status_vector};
72use crate::wire::stream::{op_name, op_packet};
73use crate::wire::xdr::ParameterBuffer;
74
75/// Um lote de mensagens vinculado a uma instrução preparada no servidor.
76///
77/// Crie-o com [`Connection::create_batch`], acumule linhas com [`Batch::add`] e
78/// envie com [`Batch::execute`]. O batch pode ser reutilizado (adicione mais e
79/// execute de novo). Ao terminar, chame [`Batch::close`] para liberá-lo.
80pub struct Batch {
81 handle: i32,
82 params: Vec<ColumnMeta>,
83 /// Mensagens já codificadas mas ainda não enviadas/executadas.
84 pending: Vec<u8>,
85 pending_count: u32,
86 /// Stream de BLOBs acumulado (cabeçalho + dados de cada blob), ainda não
87 /// enviado. Só usado em batches com coluna BLOB (política STREAM). Os blobs
88 /// vão concatenados SEM padding no wire (o servidor reconstrói o alinhamento
89 /// no buffer dele).
90 pending_blobs: Vec<u8>,
91 /// Comprimento *alinhado* do stream de blobs (campo `p_batch_blob_data` do
92 /// op_batch_blob_stream): a soma de `align4(16 + dados)` de cada blob. É o
93 /// tamanho do buffer que o servidor aloca, e deve ser múltiplo de
94 /// `BLOB_ALIGN` (o servidor rejeita caso contrário). Sempre `>=` o número de
95 /// bytes efetivamente enviados.
96 blob_stream_len: usize,
97 /// Registros pendentes de BLOBs pré-existentes (`op_batch_regblob`): cada par
98 /// é `(id_existente, id_no_batch)`. Enviados em [`Batch::execute`] antes das
99 /// mensagens, junto com o stream de blobs.
100 pending_regblobs: Vec<(u64, u64)>,
101 /// Próximo id (quad) a atribuir em [`Batch::add_blob`]/[`Batch::register_blob`].
102 /// Começa em 1; o id 0 é reservado (quad nulo).
103 next_blob_id: u64,
104 /// Verdadeiro se a instrução tem ao menos uma coluna BLOB (política STREAM
105 /// ativada na criação).
106 blob_stream: bool,
107 /// Charset da conexão, usado para codificar parâmetros de texto em
108 /// [`Batch::add`] (capturado na criação, já que `add` não recebe a conexão).
109 charset: crate::charset::Charset,
110 /// BPB padrão a aplicar aos blobs do batch (`op_batch_set_bpb`), se definido.
111 /// Enviado em [`Batch::execute`] antes do stream de blobs.
112 default_bpb: Option<Vec<u8>>,
113 /// Verdadeiro quando o BPB padrão marca os blobs como segmentados; nesse caso
114 /// [`Batch::add_blob`] enquadra os dados em segmentos.
115 blob_segmented: bool,
116 closed: bool,
117}
118
119/// Alinhamento do cabeçalho de segmento em um blob segmentado
120/// (`IBatch::BLOB_SEGHDR_ALIGN`). Como cada `add_blob` segmentado começa numa
121/// fronteira de 4 bytes (já múltiplo de 2), não há padding a inserir.
122const BLOB_SEGHDR_ALIGN: usize = 2;
123const _: () = assert!(BLOB_ALIGN.is_multiple_of(BLOB_SEGHDR_ALIGN));
124
125/// Arredonda `n` para cima até um múltiplo de `align` (potência de 2).
126#[inline]
127fn align_up(n: usize, align: usize) -> usize {
128 (n + align - 1) & !(align - 1)
129}
130
131/// Verdadeiro se o BPB marca o blob como segmentado: clumplet `isc_bpb_type`(3)
132/// com valor `isc_bpb_type_segmented`(0). Formato do BPB: byte de versão seguido
133/// de clumplets `tag(1) | len(1) | valor(len)`.
134fn bpb_is_segmented(bpb: &[u8]) -> bool {
135 let mut i = 1; // pula o byte de versão
136 while i + 1 < bpb.len() {
137 let tag = bpb[i];
138 let len = bpb[i + 1] as usize;
139 let val = &bpb[i + 2..(i + 2 + len).min(bpb.len())];
140 if tag == bpb::TYPE {
141 return val.first() == Some(&bpb::TYPE_SEGMENTED);
142 }
143 i += 2 + len;
144 }
145 false
146}
147
148/// Alinhamento (em bytes) de cada blob dentro do stream de `op_batch_blob_stream`.
149/// O servidor FB5 usa 4 (confirmado por captura: conteúdo de 30 B → campo de
150/// comprimento 32; de 33 B → 36). É o valor que `IBatch::getBlobAlignment`
151/// retorna para o protocolo remoto.
152const BLOB_ALIGN: usize = 4;
153
154impl Batch {
155 /// O handle da instrução/batch no servidor.
156 pub fn handle(&self) -> i32 {
157 self.handle
158 }
159
160 /// Quantas mensagens estão acumuladas mas ainda não executadas.
161 pub fn pending(&self) -> u32 {
162 self.pending_count
163 }
164
165 /// Os metadados dos parâmetros de entrada (a forma de cada linha esperada).
166 pub fn params(&self) -> &[ColumnMeta] {
167 &self.params
168 }
169
170 /// Registra um BLOB no stream do lote e devolve seu id (quad), que deve ser
171 /// posto no campo BLOB da linha correspondente como [`Value::Blob`].
172 ///
173 /// Só funciona em batches cuja instrução tem coluna BLOB (a política STREAM é
174 /// ativada automaticamente em [`Connection::create_batch`] nesse caso). Como
175 /// [`Self::add`], apenas acumula na memória; os blobs vão à rede em
176 /// [`Self::execute`], **antes** das mensagens que os referenciam.
177 ///
178 /// ```text
179 /// let id = batch.add_blob(b"conteudo do blob")?;
180 /// batch.add(&[Value::Text("BAT31".into()), Value::Blob(id)])?;
181 /// ```
182 pub fn add_blob(&mut self, data: &[u8]) -> Result<u64> {
183 if !self.blob_stream {
184 return Err(Error::protocol(
185 "add_blob exige uma coluna BLOB na instrução do batch",
186 ));
187 }
188 let id = self.next_blob_id;
189 self.next_blob_id += 1;
190 // Cabeçalho do blob, tudo big-endian: id(quad 8B) | size(4B) | bpb_size(4B
191 // = 0, pois o BPB é do batch). Os blobs vão concatenados SEM padding; o
192 // servidor reconstrói o alinhamento no buffer dele.
193 self.pending_blobs.extend_from_slice(&id.to_be_bytes());
194 if self.blob_segmented {
195 // Blob segmentado (um único segmento aqui). NO WIRE: cada segmento é
196 // `u32` big-endian com o comprimento + os bytes, sem padding. Mas o
197 // campo `size` (e o comprimento do stream) seguem a contabilidade do
198 // *buffer* do servidor, que usa cabeçalho de 2 bytes alinhado a
199 // `BLOB_SEGHDR_ALIGN`: `size = align2(2 + len)`. Ver `xdr_blob_stream`.
200 let size_field = align_up(2 + data.len(), BLOB_SEGHDR_ALIGN);
201 self.pending_blobs
202 .extend_from_slice(&(size_field as u32).to_be_bytes());
203 self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
204 self.pending_blobs
205 .extend_from_slice(&(data.len() as u32).to_be_bytes());
206 self.pending_blobs.extend_from_slice(data);
207 self.blob_stream_len += align_up(16 + size_field, BLOB_ALIGN);
208 } else {
209 // Blob de stream (não segmentado): dados crus.
210 self.pending_blobs
211 .extend_from_slice(&(data.len() as u32).to_be_bytes());
212 self.pending_blobs.extend_from_slice(&0u32.to_be_bytes()); // bpb_size
213 self.pending_blobs.extend_from_slice(data);
214 self.blob_stream_len += align_up(16 + data.len(), BLOB_ALIGN);
215 }
216 Ok(id)
217 }
218
219 /// Define o BPB (blob parameter buffer) padrão dos blobs do batch
220 /// (`op_batch_set_bpb`). O uso típico é marcar os blobs como **segmentados**
221 /// (`isc_bpb_type = isc_bpb_type_segmented`): nesse caso [`Self::add_blob`]
222 /// passa a enquadrar cada blob como um segmento. Deve ser chamado antes de
223 /// [`Self::add_blob`]; o BPB vai à rede em [`Self::execute`], antes dos blobs.
224 ///
225 /// Conveniência: [`Self::set_segmented`] monta o BPB segmentado para você.
226 pub fn set_default_bpb(&mut self, bpb: Vec<u8>) -> Result<()> {
227 if !self.blob_stream {
228 return Err(Error::protocol(
229 "set_default_bpb exige uma coluna BLOB na instrução do batch",
230 ));
231 }
232 // Detecta o tipo segmentado: clumplet `isc_bpb_type`(3) com valor
233 // `isc_bpb_type_segmented`(0). BPB usa comprimento de 1 byte por clumplet.
234 self.blob_segmented = bpb_is_segmented(&bpb);
235 self.default_bpb = Some(bpb);
236 Ok(())
237 }
238
239 /// Atalho para [`Self::set_default_bpb`] com um BPB que marca os blobs como
240 /// segmentados (`segmentado = true`) ou stream (`false`, o padrão).
241 pub fn set_segmented(&mut self, segmented: bool) -> Result<()> {
242 let kind = if segmented {
243 bpb::TYPE_SEGMENTED
244 } else {
245 bpb::TYPE_STREAM
246 };
247 // BPB no formato que o fbclient envia: versão(1) | tag isc_bpb_type(3) |
248 // len(1) = 4 | valor(4 bytes LE). Capturado do `11.batch.cpp` Parte 3.
249 self.set_default_bpb(vec![BPB_VERSION1, bpb::TYPE, 4, kind, 0, 0, 0])
250 }
251
252 /// Mapeia um BLOB **já existente** (criado fora do batch, p.ex. via
253 /// [`Connection::write_blob`]/[`Connection::create_blob`]) para um id local
254 /// do batch e devolve esse id, que deve ir no campo BLOB da linha como
255 /// [`Value::Blob`]. Útil para reaproveitar BLOBs grandes já gravados sem
256 /// reenviá-los pelo stream (`op_batch_regblob`).
257 ///
258 /// Como [`Self::add_blob`], só vale em batches com coluna BLOB; o registro é
259 /// acumulado e enviado em [`Self::execute`], antes das mensagens.
260 pub fn register_blob(&mut self, existing_id: u64) -> Result<u64> {
261 if !self.blob_stream {
262 return Err(Error::protocol(
263 "register_blob exige uma coluna BLOB na instrução do batch",
264 ));
265 }
266 let batch_id = self.next_blob_id;
267 self.next_blob_id += 1;
268 self.pending_regblobs.push((existing_id, batch_id));
269 Ok(batch_id)
270 }
271
272 /// Adiciona uma linha ao lote. Os valores devem corresponder, em número e
273 /// tipo, aos parâmetros da instrução (ver [`Self::params`]). Apenas acumula
274 /// na memória; nada vai à rede até [`Self::execute`].
275 pub fn add(&mut self, values: &[Value]) -> Result<()> {
276 // Codifica direto no buffer pendente (sem Vec temporário por linha). Em erro,
277 // `encode_row_into` restaura o tamanho de `pending`, então não corrompe o lote.
278 encode_row_into(&mut self.pending, &self.params, values, self.charset)?;
279 self.pending_count += 1;
280 Ok(())
281 }
282
283 /// Envia as mensagens acumuladas e executa o lote, retornando o estado de
284 /// conclusão (contagens por linha e erros por linha). Esvazia o buffer
285 /// pendente; o batch pode então ser reutilizado.
286 pub fn execute(&mut self, conn: &mut Connection, tx: &Transaction) -> Result<BatchResult> {
287 if self.closed {
288 return Err(Error::protocol("batch já foi fechado"));
289 }
290 // 0a. Define o BPB padrão (op_batch_set_bpb), se houver, antes dos blobs.
291 if let Some(bpb) = self.default_bpb.take() {
292 let mut w = op_packet(op::BATCH_SET_BPB);
293 w.put_i32(self.handle);
294 w.put_bytes(&bpb); // cstring: len(4) + bpb + pad
295 conn.io().send(&w)?;
296 read_response(conn.io())?;
297 }
298 // 0b. Envia os BLOBs pendentes (op_batch_blob_stream) ANTES das mensagens,
299 // pois cada mensagem referencia um id de blob já registrado.
300 if !self.pending_blobs.is_empty() {
301 let mut w = op_packet(op::BATCH_BLOB_STREAM);
302 w.put_i32(self.handle);
303 // Comprimento alinhado (≥ bytes no wire), depois os blobs crus. Não
304 // alinhamos o pacote: o servidor recompõe o alinhamento internamente
305 // e a próxima op pode começar em offset não múltiplo de 4.
306 w.put_i32(self.blob_stream_len as i32);
307 w.put_raw(&self.pending_blobs);
308 conn.io().send(&w)?;
309 read_response(conn.io())?;
310 self.pending_blobs.clear();
311 self.blob_stream_len = 0;
312 }
313 // 0b. Registra BLOBs pré-existentes (op_batch_regblob), também antes das
314 // mensagens. Layout: stmt | id_existente(quad 8B BE) | id_batch(quad).
315 if !self.pending_regblobs.is_empty() {
316 for (existing_id, batch_id) in std::mem::take(&mut self.pending_regblobs) {
317 let mut w = op_packet(op::BATCH_REGBLOB);
318 w.put_i32(self.handle);
319 w.put_raw(&existing_id.to_be_bytes());
320 w.put_raw(&batch_id.to_be_bytes());
321 conn.io().send(&w)?;
322 read_response(conn.io())?;
323 }
324 }
325 // 1. Envia as mensagens pendentes (op_batch_msg), se houver.
326 if self.pending_count > 0 {
327 let mut w = op_packet(op::BATCH_MSG);
328 w.put_i32(self.handle);
329 w.put_i32(self.pending_count as i32);
330 w.put_raw(&self.pending);
331 w.align();
332 conn.io().send(&w)?;
333 read_response(conn.io())?;
334 self.pending.clear();
335 self.pending_count = 0;
336 }
337
338 // 2. Executa o lote (op_batch_exec) e lê o estado de conclusão.
339 let mut w = op_packet(op::BATCH_EXEC);
340 w.put_i32(self.handle);
341 w.put_i32(tx.handle());
342 conn.io().send(&w)?;
343 read_batch_cs(conn)
344 }
345
346 /// Descarta as mensagens acumuladas que ainda não foram executadas
347 /// (`op_batch_cancel`). Não afeta linhas já executadas em chamadas anteriores.
348 pub fn cancel(&mut self, conn: &mut Connection) -> Result<()> {
349 self.pending.clear();
350 self.pending_count = 0;
351 self.pending_blobs.clear();
352 self.blob_stream_len = 0;
353 self.pending_regblobs.clear();
354 let mut w = op_packet(op::BATCH_CANCEL);
355 w.put_i32(self.handle);
356 conn.io().send(&w)?;
357 read_response(conn.io())?;
358 Ok(())
359 }
360
361 /// Libera o batch e a instrução preparada no servidor (`op_batch_rls` +
362 /// `op_free_statement` com `DSQL_drop`), consumindo o handle.
363 pub fn close(mut self, conn: &mut Connection) -> Result<()> {
364 self.closed = true;
365 let mut w = op_packet(op::BATCH_RLS);
366 w.put_i32(self.handle);
367 conn.io().send(&w)?;
368 read_response(conn.io())?;
369
370 let mut w = op_packet(op::FREE_STATEMENT);
371 w.put_i32(self.handle);
372 w.put_i32(free::DROP);
373 conn.io().send(&w)?;
374 read_response(conn.io())?;
375 Ok(())
376 }
377}
378
379impl Drop for Batch {
380 fn drop(&mut self) {
381 if !self.closed {
382 crate::warn_unclosed("Batch", self.handle);
383 }
384 }
385}
386
387/// Opções de criação de um [`Batch`] (clumplets do `op_batch_create`).
388#[derive(Debug, Clone, Copy)]
389pub struct BatchOptions {
390 /// Se `true`, o servidor CONTINUA após uma linha falhar, executando as demais e
391 /// reportando o erro de cada uma em [`BatchResult::errors`]. Para isso ele
392 /// bracketa cada linha num savepoint interno — o que tem **custo por linha**.
393 ///
394 /// Se `false` (**padrão**), o lote PARA na primeira linha que falha (fail-fast):
395 /// é bem mais rápido (sem savepoint por linha) e é o que se quer quando a
396 /// transação é "tudo ou nada" e qualquer erro aborta a operação inteira.
397 pub multierror: bool,
398 /// Se `true` (**padrão**), o servidor reporta a contagem de linhas afetadas por
399 /// mensagem em [`BatchResult::update_counts`] (e portanto [`BatchResult::total_affected`]).
400 pub record_counts: bool,
401}
402
403impl Default for BatchOptions {
404 fn default() -> Self {
405 // Fail-fast por padrão: a maioria dos usos de DML em lote quer abortar no
406 // primeiro erro, e o multierror (savepoint por linha) só compensa quando o
407 // chamador realmente vai inspecionar o erro de cada linha.
408 BatchOptions {
409 multierror: false,
410 record_counts: true,
411 }
412 }
413}
414
415impl BatchOptions {
416 /// Opções padrão (fail-fast, com contagens por linha).
417 pub fn new() -> Self {
418 Self::default()
419 }
420
421 /// Liga/desliga o modo multierro (continuar após erros por linha).
422 pub fn multierror(mut self, on: bool) -> Self {
423 self.multierror = on;
424 self
425 }
426
427 /// Liga/desliga o reporte de contagens de linhas afetadas por mensagem.
428 pub fn record_counts(mut self, on: bool) -> Self {
429 self.record_counts = on;
430 self
431 }
432}
433
434impl Connection {
435 /// Prepara uma instrução e cria um lote (batch) sobre ela com as opções padrão
436 /// ([`BatchOptions::default`]: fail-fast, com contagens por linha). A instrução
437 /// deve ter parâmetros (`?`) — cada [`Batch::add`] fornece uma linha de valores.
438 ///
439 /// Para continuar após erros por linha e coletá-los todos, use
440 /// [`Self::create_batch_with`] com `BatchOptions::new().multierror(true)`.
441 pub fn create_batch(&mut self, tx: &Transaction, sql: &str) -> Result<Batch> {
442 self.create_batch_with(tx, sql, BatchOptions::default())
443 }
444
445 /// Como [`Self::create_batch`], mas com [`BatchOptions`] explícitas.
446 pub fn create_batch_with(
447 &mut self,
448 tx: &Transaction,
449 sql: &str,
450 opts: BatchOptions,
451 ) -> Result<Batch> {
452 let mut stmt = self.prepare(tx, sql)?;
453 let handle = stmt.handle();
454 let params: Vec<ColumnMeta> = stmt.params().to_vec();
455 // O handle passa a viver no Batch (liberado por Batch::close), então
456 // marcamos como transferido para não disparar o aviso de Drop e soltamos
457 // o wrapper aqui (libera só memória).
458 stmt.forget_handle();
459 drop(stmt);
460
461 let blr = message_blr(¶ms);
462 let msglen = message_buffer_len(¶ms);
463
464 // Se houver coluna BLOB, ativa a política STREAM: os blobs são enviados
465 // em `op_batch_blob_stream` e a linha referencia o id (ver `add_blob`).
466 let blob_stream = params
467 .iter()
468 .any(|c| sql_type::base(c.sql_type) == sql_type::BLOB);
469
470 // Buffer de parâmetros do batch: byte de versão (1) seguido de clumplets
471 // com comprimento LE de 4 bytes. Os tags são opcionais (ver `BatchOptions`).
472 let mut pb = ParameterBuffer::new(1);
473 if opts.record_counts {
474 pb.bytes_be_len4(batch_tag::RECORD_COUNTS, &1u32.to_le_bytes());
475 }
476 if opts.multierror {
477 pb.bytes_be_len4(batch_tag::MULTIERROR, &1u32.to_le_bytes());
478 }
479 if blob_stream {
480 pb.bytes_be_len4(
481 batch_tag::BLOB_POLICY,
482 &(blob_policy::STREAM as u32).to_le_bytes(),
483 );
484 }
485
486 let mut w = op_packet(op::BATCH_CREATE);
487 w.put_i32(handle);
488 w.put_bytes(&blr); // cstring: len(4) + blr + pad
489 w.put_i32(msglen as i32);
490 w.put_bytes(pb.as_slice()); // cstring: len(4) + pb + pad
491 self.io().send(&w)?;
492 read_response(self.io())?;
493
494 Ok(Batch {
495 handle,
496 params,
497 pending: Vec::new(),
498 pending_count: 0,
499 pending_blobs: Vec::new(),
500 blob_stream_len: 0,
501 pending_regblobs: Vec::new(),
502 next_blob_id: 1,
503 blob_stream,
504 charset: self.charset(),
505 default_bpb: None,
506 blob_segmented: false,
507 closed: false,
508 })
509 }
510}
511
512/// Resultado da execução de um lote: o estado de conclusão por mensagem.
513#[derive(Debug, Clone, Default)]
514pub struct BatchResult {
515 /// Total de mensagens processadas nesta execução.
516 pub total: u32,
517 /// Contagem de linhas afetadas por mensagem, na ordem em que foram
518 /// adicionadas. `>= 0` é o número de linhas; [`batch_cs::EXECUTE_FAILED`]
519 /// (−1) marca uma mensagem que falhou; [`batch_cs::SUCCESS_NO_INFO`] (−2)
520 /// indica sucesso sem contagem reportada.
521 pub update_counts: Vec<i32>,
522 /// Erros detalhados por mensagem (índice da mensagem + erro do servidor).
523 pub errors: Vec<BatchError>,
524}
525
526impl BatchResult {
527 /// Verdadeiro se nenhuma mensagem falhou.
528 pub fn all_succeeded(&self) -> bool {
529 self.errors.is_empty() && !self.update_counts.contains(&batch_cs::EXECUTE_FAILED)
530 }
531
532 /// Soma das linhas afetadas pelas mensagens bem-sucedidas (ignora as que
533 /// falharam ou não reportaram contagem).
534 pub fn total_affected(&self) -> u64 {
535 self.update_counts
536 .iter()
537 .filter(|&&c| c >= 0)
538 .map(|&c| c as u64)
539 .sum()
540 }
541}
542
543/// Um erro detalhado de uma mensagem específica do lote.
544#[derive(Debug, Clone)]
545pub struct BatchError {
546 /// Índice (base zero) da mensagem que falhou, na ordem de adição.
547 pub message_index: u32,
548 /// O erro reportado pelo servidor para essa mensagem.
549 pub error: DatabaseError,
550}
551
552/// Lê a resposta `op_batch_cs` de um `op_batch_exec`.
553///
554/// Layout (confirmado por captura, inclusive com erros forçados):
555/// `op | stmt | reccount | updates | vectors | errors |`
556/// `updates×i32 (contagens) | vectors×(pos u32 + status vector) | errors×u32`.
557fn read_batch_cs(conn: &mut Connection) -> Result<BatchResult> {
558 let code = read_op(conn.io())?;
559 if code == op::RESPONSE {
560 // Falha global (não por linha) veio como op_response.
561 read_response_body(conn.io())?.into_result()?;
562 return Err(Error::protocol(
563 "op_batch_exec retornou op_response sem erro",
564 ));
565 }
566 if code != op::BATCH_CS {
567 return Err(Error::protocol(format!(
568 "esperava op_batch_cs, veio {} ({code})",
569 op_name(code)
570 )));
571 }
572
573 let _stmt = conn.io().read_i32()?;
574 let reccount = conn.io().read_i32()? as u32;
575 let updates = conn.io().read_i32()? as u32;
576 let vectors = conn.io().read_i32()? as u32;
577 let errors = conn.io().read_i32()? as u32;
578
579 let mut update_counts = Vec::with_capacity(updates as usize);
580 for _ in 0..updates {
581 update_counts.push(conn.io().read_i32()?);
582 }
583
584 let mut batch_errors = Vec::with_capacity(vectors as usize);
585 for _ in 0..vectors {
586 let pos = conn.io().read_i32()? as u32;
587 let status = read_status_vector(conn.io())?;
588 batch_errors.push(BatchError {
589 message_index: pos,
590 error: DatabaseError::new(status),
591 });
592 }
593 // Lista simples de posições com erro (quando os detalhes não são pedidos).
594 for _ in 0..errors {
595 let pos = conn.io().read_i32()? as u32;
596 if !batch_errors.iter().any(|e| e.message_index == pos) {
597 let empty = StatusVector {
598 args: Vec::new(),
599 sql_state: None,
600 };
601 batch_errors.push(BatchError {
602 message_index: pos,
603 error: DatabaseError::new(empty),
604 });
605 }
606 }
607
608 Ok(BatchResult {
609 total: reccount,
610 update_counts,
611 errors: batch_errors,
612 })
613}