use crate::bridge::dispatch::BridgeResponse;
use crate::bridge::envelope::{ErrorCode, Response};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::response_codec::{self, DocumentRow};
use crate::data::executor::task::ExecutionTask;
impl CoreLoop {
pub(in crate::data::executor) fn send_document_rows_transformed(
&mut self,
task: &ExecutionTask,
result: &Vec<DocumentRow>,
chunk_size: usize,
) -> Response {
if result.len() <= chunk_size {
match response_codec::encode(result) {
Ok(payload) => self.response_with_payload(task, payload),
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
),
}
} else {
self.stream_chunks_transformed(task, result, chunk_size)
}
}
pub(in crate::data::executor) fn send_document_rows_raw(
&mut self,
task: &ExecutionTask,
rows: &[(String, Vec<u8>)],
chunk_size: usize,
) -> Response {
if rows.len() <= chunk_size {
match response_codec::encode_raw_document_rows(rows) {
Ok(payload) => self.response_with_payload(task, payload),
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
),
}
} else {
self.stream_chunks_raw(task, rows, chunk_size)
}
}
fn stream_chunks_transformed(
&mut self,
task: &ExecutionTask,
result: &[DocumentRow],
chunk_size: usize,
) -> Response {
let chunks: Vec<_> = result.chunks(chunk_size).collect();
let last_idx = chunks.len().saturating_sub(1);
for (i, chunk) in chunks.iter().enumerate() {
let is_last = i == last_idx;
match response_codec::encode(&chunk.to_vec()) {
Ok(payload) => {
if is_last {
return self.response_with_payload(task, payload);
}
let partial = self.response_partial(task, payload);
let _ = self.response_tx.try_push(BridgeResponse { inner: partial });
}
Err(e) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
);
}
}
}
self.response_error(
task,
ErrorCode::Internal {
detail: "streaming response incomplete".into(),
},
)
}
fn stream_chunks_raw(
&mut self,
task: &ExecutionTask,
rows: &[(String, Vec<u8>)],
chunk_size: usize,
) -> Response {
let chunks: Vec<_> = rows.chunks(chunk_size).collect();
let last_idx = chunks.len().saturating_sub(1);
for (i, chunk) in chunks.iter().enumerate() {
let is_last = i == last_idx;
match response_codec::encode_raw_document_rows(chunk) {
Ok(payload) => {
if is_last {
return self.response_with_payload(task, payload);
}
let partial = self.response_partial(task, payload);
let _ = self.response_tx.try_push(BridgeResponse { inner: partial });
}
Err(e) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
);
}
}
}
self.response_error(
task,
ErrorCode::Internal {
detail: "streaming response incomplete".into(),
},
)
}
}