msy 0.4.6

Modern musl rsync alternative - Fast, parallel file synchronization
Documentation
//! Server mode - runs when invoked as `sy --server <path>`
//!
//! Uses streaming protocol (v2) for all operations.
//!
//! Code appears "dead" to the compiler since it's only used at runtime.
#![allow(dead_code)]

use anyhow::Result;
use bytes::Bytes;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{self, AsyncWriteExt};
use tokio::sync::mpsc;

use crate::streaming::{
	Generator, GeneratorConfig, Receiver, ReceiverConfig, Sender, SenderConfig,
	channel::file_job_channel,
	protocol::{self as v2, HelloFlags, MessageType},
};

/// Expand tilde (~) in paths to the user's home directory.
fn expand_tilde(path: &Path) -> PathBuf {
	let path_str = path.to_string_lossy();

	if path_str == "~" {
		dirs::home_dir().unwrap_or_else(|| PathBuf::from("."))
	} else if let Some(rest) = path_str.strip_prefix("~/") {
		if let Some(home) = dirs::home_dir() { home.join(rest) } else { path.to_path_buf() }
	} else {
		path.to_path_buf()
	}
}

/// Main server entry point
pub async fn run_server() -> Result<()> {
	let args: Vec<String> = std::env::args().collect();
	let raw_path = args.last().map(PathBuf::from).unwrap_or_else(|| PathBuf::from("."));

	let root_path = expand_tilde(&raw_path);

	if !root_path.exists() {
		std::fs::create_dir_all(&root_path)?;
	}

	let mut stdin = io::stdin();
	let mut stdout = io::stdout();

	// Read Hello frame
	let (msg_type, payload) = v2::read_frame(&mut stdin).await?;

	if msg_type != MessageType::Hello {
		let fatal = v2::Fatal { code: 1, message: format!("Expected HELLO, got {:?}", msg_type) };
		v2::write_frame(&mut stdout, &fatal.encode()).await?;
		stdout.flush().await?;
		return Ok(());
	}

	let hello = v2::Hello::decode(payload)?;

	// Ensure root exists
	if !root_path.exists() {
		fs::create_dir_all(&root_path).await?;
	}

	// Send Hello response
	let resp = v2::Hello::new(HelloFlags::empty(), "");
	v2::write_frame(&mut stdout, &resp.encode()).await?;
	stdout.flush().await?;

	if hello.flags.contains(HelloFlags::PULL) {
		run_server_pull(hello, root_path, stdin, stdout).await
	} else {
		run_server_push(hello, root_path, stdin, stdout).await
	}
}

/// Handle PULL mode: client pulls files from server (we are source)
async fn run_server_pull(hello: v2::Hello, root_path: PathBuf, mut stdin: impl io::AsyncRead + Unpin, mut stdout: impl io::AsyncWrite + Unpin) -> Result<()> {
	// 1. Receive DEST_FILE_ENTRY messages from client (Initial Exchange)
	let mut generator = Generator::new(GeneratorConfig {
		root: root_path.clone(),
		include_hidden: true,
		follow_symlinks: false,
		delete_enabled: hello.flags.contains(HelloFlags::DELETE),
	});

	loop {
		let (msg_type, payload) = v2::read_frame(&mut stdin).await?;
		match msg_type {
			MessageType::DestFileEntry => {
				let entry = v2::DestFileEntry::decode(payload)?;
				generator.add_dest_entry(entry);
			}
			MessageType::DestFileEnd => break,
			_ => anyhow::bail!("Unexpected message during Initial Exchange: {:?}", msg_type),
		}
	}

	// 2. Run Generator and Sender pipeline
	let (tx, rx) = file_job_channel();
	let gen_handle = tokio::spawn(async move { generator.run(tx).await });

	let sender = Sender::new(SenderConfig { root: root_path, compress: hello.flags.contains(HelloFlags::COMPRESSION) });

	// Use unbounded channel to avoid blocking_send (panics in tokio context)
	let (data_tx, mut data_rx) = mpsc::unbounded_channel::<Bytes>();

	// Spawn sender - uses unbounded_send which never blocks
	let sender_handle = tokio::spawn(async move { sender.run(rx, |bytes| data_tx.send(bytes).map_err(|_| anyhow::anyhow!("Data channel closed"))).await });

	// Stream data to client (concurrent with sender)
	while let Some(bytes) = data_rx.recv().await {
		v2::write_frame(&mut stdout, &bytes).await?;
	}
	stdout.flush().await?;

	let (total_files, total_bytes) = gen_handle.await??;
	sender_handle.await??;

	// Send DONE
	let done = v2::Done { files_ok: total_files, files_err: 0, bytes: total_bytes, duration_ms: 0 };
	v2::write_frame(&mut stdout, &done.encode()).await?;
	stdout.flush().await?;

	Ok(())
}

/// Handle PUSH mode: client pushes files to server (we are destination)
async fn run_server_push(_hello: v2::Hello, root_path: PathBuf, mut stdin: impl io::AsyncRead + Unpin, mut stdout: impl io::AsyncWrite + Unpin) -> Result<()> {
	let mut receiver = Receiver::new(ReceiverConfig { root: root_path.clone(), block_size: 4096 });

	// 1. Send Initial Exchange (our files metadata)
	// Use unbounded channel to avoid blocking_send (panics in tokio context)
	let (data_tx, mut data_rx) = mpsc::unbounded_channel::<Bytes>();
	let receiver_root = root_path.clone();

	// Spawn scanner - uses unbounded_send which never blocks
	let scan_handle = tokio::spawn(async move {
		let receiver = Receiver::new(ReceiverConfig { root: receiver_root, block_size: 4096 });
		receiver.scan_dest(|bytes| data_tx.send(bytes).map_err(|_| anyhow::anyhow!("Data channel closed"))).await
	});

	// Write data as it arrives (concurrent with scan)
	while let Some(bytes) = data_rx.recv().await {
		v2::write_frame(&mut stdout, &bytes).await?;
	}
	stdout.flush().await?;

	// Wait for scanner to complete
	scan_handle.await??;

	// 2. Receive streaming messages
	loop {
		let (msg_type, payload) = v2::read_frame(&mut stdin).await?;

		if msg_type == MessageType::Done {
			break;
		}

		receiver.handle_message(msg_type, payload).await?;
	}

	// 3. Send DONE
	let done = v2::Done {
		files_ok: receiver.stats().files_ok,
		files_err: receiver.stats().files_err,
		bytes: receiver.stats().bytes_transferred,
		duration_ms: 0,
	};
	v2::write_frame(&mut stdout, &done.encode()).await?;
	stdout.flush().await?;

	Ok(())
}