use crate::error::{Error, Result};
use bytes::Bytes;
use filemode::{UnixMode, GoFileMode};
use h2::server::{self, SendResponse};
use http::{Request, Response, StatusCode};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context as TaskContext, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use prost::Message as ProstMessage;
use crate::proto::moby::buildkit::v1::BytesMessage;
use super::{FileSyncServer, AuthServer, SecretsServer};
pub struct GrpcTunnel {
file_sync: Option<FileSyncServer>,
auth: Option<AuthServer>,
secrets: Option<SecretsServer>,
}
impl GrpcTunnel {
pub fn new(
_response_tx: mpsc::Sender<BytesMessage>,
file_sync: Option<FileSyncServer>,
auth: Option<AuthServer>,
secrets: Option<SecretsServer>,
) -> Self {
Self {
file_sync,
auth,
secrets,
}
}
pub async fn serve(
self,
inbound_rx: mpsc::Receiver<BytesMessage>,
outbound_tx: mpsc::Sender<BytesMessage>,
) -> Result<()> {
let tunnel = Arc::new(self);
let stream = MessageStream::new(inbound_rx, outbound_tx);
let mut h2_conn = server::handshake(stream).await
.map_err(|e| Error::Http2Handshake { source: e })?;
tracing::info!("HTTP/2 server started in session tunnel");
while let Some(result) = h2_conn.accept().await {
let (request, respond) = result.map_err(|e| Error::Http2Stream { source: e })?;
let tunnel_ref = Arc::clone(&tunnel);
tokio::spawn(async move {
if let Err(e) = tunnel_ref.handle_request(request, respond).await {
tracing::error!("Failed to handle gRPC request: {}", e);
}
});
}
Ok(())
}
async fn handle_request(
&self,
req: Request<h2::RecvStream>,
respond: SendResponse<Bytes>,
) -> Result<()> {
let method = req.uri().path().to_string();
tracing::info!("Received gRPC call: {}", method);
eprintln!("\n=== Request Headers for {} ===", method);
for (name, value) in req.headers() {
if let Ok(v) = value.to_str() {
eprintln!(" {}: {}", name, v);
}
}
let dir_name = req.headers()
.get("dir-name")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let followpaths: Vec<String> = req.headers()
.get_all("followpaths")
.iter()
.filter_map(|v| v.to_str().ok())
.map(|s| s.to_string())
.collect();
let body = req.into_body();
match method.as_str() {
"/grpc.health.v1.Health/Check" => {
let payload = Self::read_unary_request(body).await?;
let response_payload = self.handle_health_check(payload).await?;
self.send_success_response(respond, response_payload).await
}
"/moby.filesync.v1.FileSync/DiffCopy" => {
self.handle_file_sync_diff_copy_stream(body, respond, dir_name, followpaths).await
}
"/moby.filesync.v1.Auth/GetTokenAuthority" => {
tracing::info!("Auth.GetTokenAuthority called - returning not implemented");
self.send_error_response(respond, "Token auth not implemented").await
}
"/moby.filesync.v1.Auth/Credentials" => {
let payload = Self::read_unary_request(body).await?;
let response_payload = self.handle_auth_credentials(payload).await?;
self.send_success_response(respond, response_payload).await
}
"/moby.filesync.v1.Auth/FetchToken" => {
let payload = Self::read_unary_request(body).await?;
let response_payload = self.handle_auth_fetch_token(payload).await?;
self.send_success_response(respond, response_payload).await
}
"/moby.buildkit.secrets.v1.Secrets/GetSecret" => {
let payload = Self::read_unary_request(body).await?;
let response_payload = self.handle_secrets_get_secret(payload).await?;
self.send_success_response(respond, response_payload).await
}
_ => {
tracing::warn!("Unknown gRPC method: {}", method);
self.send_error_response(respond, "Unimplemented").await
}
}
}
async fn read_unary_request(mut body: h2::RecvStream) -> Result<Bytes> {
let mut request_data = Vec::new();
while let Some(chunk) = body.data().await {
let chunk = chunk.map_err(|e| Error::Http2Stream { source: e })?;
request_data.extend_from_slice(&chunk);
let _ = body.flow_control().release_capacity(chunk.len());
}
let payload = if request_data.len() > 5 {
Bytes::copy_from_slice(&request_data[5..])
} else {
Bytes::new()
};
Ok(payload)
}
async fn send_success_response(
&self,
mut respond: SendResponse<Bytes>,
payload: Bytes,
) -> Result<()> {
let response = Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/grpc")
.body(())
.unwrap();
let mut send_stream = respond.send_response(response, false)
.map_err(|e| Error::Http2Stream { source: e })?;
let mut framed = Vec::new();
framed.push(0); framed.extend_from_slice(&(payload.len() as u32).to_be_bytes());
framed.extend_from_slice(&payload);
send_stream.send_data(Bytes::from(framed), false)
.map_err(|e| Error::Http2Stream { source: e })?;
let trailers = Response::builder()
.header("grpc-status", "0")
.body(())
.unwrap();
send_stream.send_trailers(trailers.headers().clone())
.map_err(|e| Error::Http2Stream { source: e })?;
Ok(())
}
async fn send_error_response(
&self,
mut respond: SendResponse<Bytes>,
message: &str,
) -> Result<()> {
let response = Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/grpc")
.header("grpc-status", "12") .header("grpc-message", message)
.body(())
.unwrap();
respond.send_response(response, true)
.map_err(|e| Error::Http2Stream { source: e })?;
Ok(())
}
async fn handle_file_sync_diff_copy_stream(
&self,
mut request_stream: h2::RecvStream,
mut respond: SendResponse<Bytes>,
dir_name: Option<String>,
followpaths: Vec<String>,
) -> Result<()> {
use crate::proto::fsutil::types::{Packet, packet::PacketType};
use prost::Message as ProstMessage;
use std::sync::atomic::{AtomicU32, Ordering};
static CALL_COUNTER: AtomicU32 = AtomicU32::new(0);
let call_id = CALL_COUNTER.fetch_add(1, Ordering::SeqCst);
tracing::info!("handle_file_sync_diff_copy_stream called (call #{}, dir_name: {:?}, followpaths: {:?})", call_id, dir_name, followpaths);
eprintln!("\n========== DiffCopy Call #{} (dir_name: {:?}, followpaths: {:?}) ==========", call_id, dir_name, followpaths);
let file_sync = match &self.file_sync {
Some(fs) => fs,
None => {
tracing::error!("FileSync not available");
return self.send_error_response(respond, "FileSync not available").await;
}
};
tracing::info!("FileSync.DiffCopy streaming started (call #{})", call_id);
let response = Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/grpc")
.body(())
.unwrap();
let mut send_stream = respond.send_response(response, false)
.map_err(|e| Error::Http2Stream { source: e })?;
tracing::info!("Sent response headers for DiffCopy");
let root_path = file_sync.get_root_path();
tracing::info!("Starting to send STAT packets from: {} (call #{})", root_path.display(), call_id);
eprintln!("Root path: {}, is_dir: {}", root_path.display(), root_path.is_dir());
use std::collections::HashMap;
let mut file_map = HashMap::new();
let mut id_counter = 0u32;
let send_only_dockerfile = dir_name.as_deref() == Some("dockerfile");
if send_only_dockerfile {
let dockerfile_name = if !followpaths.is_empty() && followpaths[0].ends_with(".Dockerfile") {
followpaths[0].clone()
} else {
"Dockerfile".to_string()
};
eprintln!("BuildKit requested 'dockerfile' - sending only {}", dockerfile_name);
use crate::proto::fsutil::types::{Packet, packet::PacketType, Stat};
let dockerfile_path = root_path.join(&dockerfile_name);
if !dockerfile_path.exists() {
tracing::error!("{} not found at {}", dockerfile_name, dockerfile_path.display());
let trailers = Response::builder()
.header("grpc-status", "2")
.header("grpc-message", format!("{} not found", dockerfile_name))
.body(())
.unwrap();
let _ = send_stream.send_trailers(trailers.headers().clone());
return Err(Error::PathNotFound(dockerfile_path.clone()));
}
let metadata = tokio::fs::metadata(&dockerfile_path).await?;
let mut stat = Stat {
path: dockerfile_name.clone(),
mode: 0,
uid: 0,
gid: 0,
size: metadata.len() as i64,
mod_time: 0,
linkname: String::new(),
devmajor: 0,
devminor: 0,
xattrs: std::collections::HashMap::new(),
};
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let unix_mode = metadata.permissions().mode();
stat.mode = GoFileMode::from(UnixMode::from(unix_mode)).as_u32();
}
#[cfg(not(unix))]
{
stat.mode = 0o644; }
let mode = stat.mode;
let stat_packet = Packet {
r#type: PacketType::PacketStat as i32,
stat: Some(stat),
id: 0,
data: vec![],
};
eprintln!("DFS: Sending STAT #0: {} (FILE, mode: 0o{:o})", dockerfile_name, mode);
Self::send_grpc_packet(&mut send_stream, &stat_packet).await?;
file_map.insert(0, dockerfile_path);
} else {
if followpaths.is_empty() {
eprintln!("BuildKit requested full context - sending entire directory tree");
} else {
eprintln!("BuildKit requested filtered context - followpaths: {:?}", followpaths);
}
if let Err(e) = Self::send_stat_packets_dfs(
root_path.clone(),
String::new(),
&mut send_stream,
&mut file_map,
&mut id_counter,
if followpaths.is_empty() { None } else { Some(&followpaths) },
).await {
tracing::error!("Error sending STAT packets: {}", e);
let trailers = Response::builder()
.header("grpc-status", "2")
.header("grpc-message", e.to_string())
.body(())
.unwrap();
let _ = send_stream.send_trailers(trailers.headers().clone());
return Err(e);
}
}
let final_stat_packet = Packet {
r#type: PacketType::PacketStat as i32,
stat: None,
id: 0,
data: vec![],
};
Self::send_grpc_packet(&mut send_stream, &final_stat_packet).await?;
tracing::info!("Sent all STAT packets (including final empty STAT), now waiting for REQ packets from BuildKit");
let mut buffer = Vec::new();
let mut received_fin = false;
loop {
match request_stream.data().await {
Some(Ok(chunk)) => {
buffer.extend_from_slice(&chunk);
let _ = request_stream.flow_control().release_capacity(chunk.len());
while buffer.len() >= 5 {
let compressed = buffer[0];
let length = u32::from_be_bytes([buffer[1], buffer[2], buffer[3], buffer[4]]) as usize;
if buffer.len() < 5 + length {
break;
}
let message_data = buffer[5..5+length].to_vec();
buffer.drain(0..5+length);
if compressed != 0 {
tracing::warn!("Received compressed message, skipping");
continue;
}
let packet = match Packet::decode(Bytes::from(message_data)) {
Ok(p) => p,
Err(e) => {
tracing::error!("Failed to decode packet: {}", e);
continue;
}
};
let packet_type = PacketType::try_from(packet.r#type).unwrap_or(PacketType::PacketStat);
tracing::debug!("Received packet type: {:?}, id: {}, has_stat: {}",
packet_type, packet.id, packet.stat.is_some());
match packet_type {
PacketType::PacketReq => {
tracing::info!("Received REQ packet with id: {}", packet.id);
if let Some(file_path) = file_map.get(&packet.id) {
tracing::info!("Sending file data for id {}: {}", packet.id, file_path.display());
if let Err(e) = Self::send_file_data_packets(file_path.clone(), packet.id, &mut send_stream).await {
tracing::error!("Failed to send file data: {}", e);
}
} else {
tracing::warn!("File ID {} not found in map (probably a directory, ignoring)", packet.id);
}
}
PacketType::PacketFin => {
tracing::info!("Received FIN packet from BuildKit, ending transfer");
received_fin = true;
break;
}
_ => {
tracing::debug!("Ignoring packet type: {:?}", packet_type);
}
}
}
if received_fin {
break;
}
}
Some(Err(e)) => {
tracing::error!("Error reading request stream: {}", e);
break;
}
None => {
tracing::info!("Request stream ended");
break;
}
}
}
tracing::info!("DiffCopy completed, sending FIN packet");
let fin_packet = Packet {
r#type: PacketType::PacketFin as i32,
stat: None,
id: 0,
data: vec![],
};
Self::send_grpc_packet(&mut send_stream, &fin_packet).await?;
tracing::debug!("Sent final FIN packet");
let trailers = Response::builder()
.header("grpc-status", "0")
.body(())
.unwrap();
send_stream.send_trailers(trailers.headers().clone())
.map_err(|e| Error::Http2Stream { source: e })?;
Ok(())
}
fn send_stat_packets_dfs<'a>(
path: std::path::PathBuf,
prefix: String,
stream: &'a mut h2::SendStream<Bytes>,
file_map: &'a mut std::collections::HashMap<u32, std::path::PathBuf>,
id_counter: &'a mut u32,
followpaths: Option<&'a Vec<String>>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
use crate::proto::fsutil::types::{Packet, packet::PacketType, Stat};
tracing::debug!("send_stat_packets_dfs: {} (prefix: {}, followpaths: {:?})", path.display(), prefix, followpaths);
let include_paths = if let Some(paths) = followpaths {
let mut set = std::collections::HashSet::new();
for p in paths {
set.insert(p.clone());
let mut parent = p.as_str();
while let Some(idx) = parent.rfind('/') {
parent = &parent[..idx];
set.insert(parent.to_string());
}
}
tracing::debug!("Built include_paths set with {} entries: {:?}", set.len(), set);
Some(set)
} else {
None
};
let mut entries = Vec::new();
let mut dir_entries = tokio::fs::read_dir(&path).await?;
while let Some(entry) = dir_entries.next_entry().await? {
let file_name = entry.file_name();
let name = file_name.to_string_lossy().to_string();
let entry_path = entry.path();
let metadata = entry.metadata().await?;
entries.push((name, entry_path, metadata));
}
entries.sort_by(|a, b| a.0.cmp(&b.0));
for (name, entry_path, metadata) in entries {
let rel_path = if prefix.is_empty() {
name.clone()
} else {
format!("{}/{}", prefix, name)
};
if let Some(ref paths) = include_paths {
if !paths.contains(&rel_path) {
tracing::debug!("Skipping {} (not in followpaths)", rel_path);
eprintln!("DFS: Skipping {} (not in include_paths)", rel_path);
continue;
} else {
eprintln!("DFS: Including {} (found in include_paths)", rel_path);
}
}
let entry_id = *id_counter;
*id_counter += 1;
let mut stat = Stat {
path: rel_path.clone(),
mode: 0,
uid: 0,
gid: 0,
size: if metadata.is_dir() { 0 } else { metadata.len() as i64 },
mod_time: 0,
linkname: String::new(),
devmajor: 0,
devminor: 0,
xattrs: std::collections::HashMap::new(),
};
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let unix_mode = metadata.permissions().mode();
stat.mode = GoFileMode::from(UnixMode::from(unix_mode)).as_u32();
}
#[cfg(not(unix))]
{
stat.mode = if metadata.is_dir() {
0x80000000 | 0o755 } else {
0o644 };
}
let mode = stat.mode;
let size = stat.size;
let path_sent = stat.path.clone();
let stat_packet = Packet {
r#type: PacketType::PacketStat as i32,
stat: Some(stat),
id: entry_id,
data: vec![],
};
tracing::info!("Sending STAT packet for: {} (id: {}, mode: 0o{:o})", path_sent, entry_id, mode);
eprintln!("DFS: Sending STAT #{}: {} ({}, mode: 0o{:o} / 0x{:x}, size: {}, is_dir: {})",
entry_id, path_sent,
if metadata.is_dir() { "DIR" } else { "FILE" },
mode, mode, size, (mode & 0o040000) != 0);
Self::send_grpc_packet(stream, &stat_packet).await?;
if metadata.is_file() {
file_map.insert(entry_id, entry_path.clone());
}
if metadata.is_dir() {
Self::send_stat_packets_dfs(entry_path, rel_path, stream, file_map, id_counter, followpaths).await?;
}
}
Ok(())
})
}
fn collect_entries_recursive<'a>(
path: std::path::PathBuf,
prefix: String,
result: &'a mut Vec<(String, std::path::PathBuf, std::fs::Metadata)>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
tracing::debug!("Collecting entries from: {} (prefix: {})", path.display(), prefix);
let mut entries = tokio::fs::read_dir(&path).await?;
while let Some(entry) = entries.next_entry().await? {
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
let rel_path = if prefix.is_empty() {
name.to_string()
} else {
format!("{}/{}", prefix, name)
};
let entry_path = entry.path();
let metadata = entry.metadata().await?;
result.push((rel_path.clone(), entry_path.clone(), metadata.clone()));
if metadata.is_dir() {
Self::collect_entries_recursive(entry_path, rel_path, result).await?;
}
}
Ok(())
})
}
async fn send_file_data_packets(
path: std::path::PathBuf,
req_id: u32,
stream: &mut h2::SendStream<Bytes>,
) -> Result<()> {
use crate::proto::fsutil::types::{Packet, packet::PacketType};
use tokio::io::AsyncReadExt;
tracing::info!("Sending file data for: {} (id: {})", path.display(), req_id);
let mut file = tokio::fs::File::open(&path).await
?;
let mut buffer = vec![0u8; 32 * 1024];
loop {
let n = file.read(&mut buffer).await?;
if n == 0 {
break;
}
let data_packet = Packet {
r#type: PacketType::PacketData as i32,
stat: None,
id: req_id,
data: buffer[..n].to_vec(),
};
Self::send_grpc_packet(stream, &data_packet).await?;
}
let eof_packet = Packet {
r#type: PacketType::PacketData as i32,
stat: None,
id: req_id,
data: vec![],
};
Self::send_grpc_packet(stream, &eof_packet).await?;
tracing::debug!("Sent EOF (empty DATA) packet for id: {}", req_id);
Ok(())
}
async fn send_grpc_packet(
stream: &mut h2::SendStream<Bytes>,
packet: &crate::proto::fsutil::types::Packet,
) -> Result<()> {
use prost::Message as ProstMessage;
use crate::proto::fsutil::types::packet::PacketType;
let mut payload = Vec::new();
packet.encode(&mut payload)?;
let mut framed = Vec::new();
framed.push(0); framed.extend_from_slice(&(payload.len() as u32).to_be_bytes());
framed.extend_from_slice(&payload);
let packet_type = PacketType::try_from(packet.r#type).ok();
tracing::trace!("Sending packet: type={:?}, id={}, data_len={}, total_frame_len={}",
packet_type, packet.id, packet.data.len(), framed.len());
stream.send_data(Bytes::from(framed), false)
.map_err(|e| Error::Http2Stream { source: e })?;
tokio::task::yield_now().await;
Ok(())
}
#[allow(dead_code)]
async fn handle_auth_get_token_authority(&self, payload: Bytes) -> Result<Bytes> {
use crate::proto::moby::filesync::v1::{GetTokenAuthorityRequest, GetTokenAuthorityResponse};
let request = GetTokenAuthorityRequest::decode(payload)
.map_err(|e| Error::decode("GetTokenAuthorityRequest", e))?;
tracing::info!("Auth.GetTokenAuthority request for host: {}", request.host);
let response = GetTokenAuthorityResponse {
public_key: vec![],
};
let mut buf = Vec::new();
response.encode(&mut buf)?;
Ok(Bytes::from(buf))
}
async fn handle_auth_credentials(&self, payload: Bytes) -> Result<Bytes> {
use crate::proto::moby::filesync::v1::CredentialsRequest;
use tonic::Request;
use crate::proto::moby::filesync::v1::auth_server::Auth;
let request = CredentialsRequest::decode(payload)
.map_err(|e| Error::decode("CredentialsRequest", e))?;
tracing::info!("Auth.Credentials request for host: {}", request.host);
let response = if let Some(auth) = &self.auth {
match auth.credentials(Request::new(request.clone())).await {
Ok(resp) => {
let inner = resp.into_inner();
if !inner.username.is_empty() {
tracing::debug!("Returning credentials for host: {} (username: {})",
request.host, inner.username);
} else {
tracing::debug!("No credentials found for host: {}, returning empty", request.host);
}
inner
}
Err(status) => {
tracing::warn!("Failed to get credentials: {}, returning empty", status.message());
use crate::proto::moby::filesync::v1::CredentialsResponse;
CredentialsResponse {
username: String::new(),
secret: String::new(),
}
}
}
} else {
tracing::debug!("No auth configured, returning empty credentials");
use crate::proto::moby::filesync::v1::CredentialsResponse;
CredentialsResponse {
username: String::new(),
secret: String::new(),
}
};
let mut buf = Vec::new();
response.encode(&mut buf)?;
Ok(Bytes::from(buf))
}
async fn handle_auth_fetch_token(&self, _payload: Bytes) -> Result<Bytes> {
use crate::proto::moby::filesync::v1::FetchTokenResponse;
tracing::info!("Auth.FetchToken called");
let response = FetchTokenResponse {
token: String::new(),
expires_in: 0,
issued_at: 0,
};
let mut buf = Vec::new();
response.encode(&mut buf)?;
Ok(Bytes::from(buf))
}
async fn handle_secrets_get_secret(&self, payload: Bytes) -> Result<Bytes> {
use crate::proto::moby::secrets::v1::GetSecretRequest;
let request = GetSecretRequest::decode(payload)
.map_err(|e| Error::decode("GetSecretRequest", e))?;
tracing::info!("Secrets.GetSecret request for ID: {}", request.id);
let response = if let Some(secrets) = &self.secrets {
use tonic::Request;
use crate::proto::moby::secrets::v1::secrets_server::Secrets;
match secrets.get_secret(Request::new(request.clone())).await {
Ok(resp) => {
let inner = resp.into_inner();
tracing::debug!("Returning secret '{}' ({} bytes)", request.id, inner.data.len());
inner
}
Err(status) => {
tracing::warn!("Secret '{}' not found: {}", request.id, status.message());
return Err(Error::SecretNotFound(status.message().to_string()));
}
}
} else {
tracing::warn!("Secrets service not configured");
return Err(Error::SecretsNotConfigured);
};
let mut buf = Vec::new();
response.encode(&mut buf)?;
Ok(Bytes::from(buf))
}
async fn handle_health_check(&self, _payload: Bytes) -> Result<Bytes> {
tracing::info!("Health check called");
let response = vec![0x08, 0x01]; Ok(Bytes::from(response))
}
}
struct MessageStream {
inbound_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<BytesMessage>>>,
outbound_tx: mpsc::Sender<BytesMessage>,
read_buffer: Vec<u8>,
read_pos: usize,
}
impl MessageStream {
fn new(
inbound_rx: mpsc::Receiver<BytesMessage>,
outbound_tx: mpsc::Sender<BytesMessage>,
) -> Self {
Self {
inbound_rx: Arc::new(tokio::sync::Mutex::new(inbound_rx)),
outbound_tx,
read_buffer: Vec::new(),
read_pos: 0,
}
}
}
impl AsyncRead for MessageStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if self.read_pos < self.read_buffer.len() {
let remaining = &self.read_buffer[self.read_pos..];
let to_copy = remaining.len().min(buf.remaining());
buf.put_slice(&remaining[..to_copy]);
self.read_pos += to_copy;
if self.read_pos >= self.read_buffer.len() {
self.read_buffer.clear();
self.read_pos = 0;
}
return Poll::Ready(Ok(()));
}
let inbound_rx = self.inbound_rx.clone();
let mut rx = match inbound_rx.try_lock() {
Ok(rx) => rx,
Err(_) => return Poll::Pending,
};
match rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => {
self.read_buffer = msg.data;
self.read_pos = 0;
let to_copy = self.read_buffer.len().min(buf.remaining());
buf.put_slice(&self.read_buffer[..to_copy]);
self.read_pos = to_copy;
Poll::Ready(Ok(()))
}
Poll::Ready(None) => Poll::Ready(Ok(())), Poll::Pending => Poll::Pending,
}
}
}
impl AsyncWrite for MessageStream {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let msg = BytesMessage {
data: buf.to_vec(),
};
match self.outbound_tx.try_send(msg) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(mpsc::error::TrySendError::Full(_)) => {
Poll::Pending
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Channel closed",
)))
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut TaskContext<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}