sftp-server 0.1.0

Library providing a pure Rust implementation of an SFTP server; can operate standalone with an embedded SSH server, or can provide the SFTP backend for an external SSH server (e.g. openssh). Binary crates should pull in this library and provide their own storage back-end.
Documentation
#![allow(unused_parens)]
#![allow(non_upper_case_globals)]
#[macro_use] extern crate async_trait;

use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;

use futures::executor::block_on;
#[cfg(feature = "standalone")]
use futures::future::{ready, Ready};

use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
use tokio::io::SeekFrom;
use tokio::sync::Mutex;

use bincode::Options;

#[cfg(feature = "standalone")]
use thrussh::{
	ChannelId,
	server::{
		Auth,
		Handle,
		Handler,
		Response,
		Session
	}
};

use uuid::Uuid;

use sftp_protocol::packet;
use sftp_protocol::packet::name::File;
use sftp_protocol::packet::open::OpenFlags;
use sftp_protocol::packet::PayloadTrait;
use sftp_protocol::packet::status::StatusType;
use sftp_protocol::Packet;
use sftp_protocol::Payload;
use sftp_protocol::parser::Parser;

pub use sftp_protocol::common;

pub mod backend;
use backend::Backend;
mod error;
pub use error::Error;
pub mod file;
use file::OpenFile;

#[derive(Clone)]
pub struct Server<B: Backend> {
	backend: B,

	#[cfg(feature = "standalone")]
	pub clients: Arc<std::sync::Mutex<HashMap<(usize, ChannelId), Handle>>>,
	pub id: usize,

	#[allow(clippy::type_complexity)]
	// In order to support large directories without blowing up, this may end up needing to hold a Stream<Item=File> instead of VecDeque<File>; for now this is fine.
	open_dirs: Arc<Mutex<HashMap<Uuid, (VecDeque<File>, usize)>>>,
	open_files: Arc<Mutex<HashMap<Uuid, OpenFile>>>,
	#[cfg(feature = "standalone")]
	parsers: HashMap<ChannelId, Parser>,
}

impl<B: Backend> Server<B> {
	pub fn new(backend: B, id: usize) -> Self /* {{{ */ {
		Self{
			backend,
			#[cfg(feature = "standalone")]
			clients: Arc::new(std::sync::Mutex::new(HashMap::new())),
			id,
			open_dirs: Arc::new(Mutex::new(HashMap::new())),
			open_files: Arc::new(Mutex::new(HashMap::new())),
			#[cfg(feature = "standalone")]
			parsers: HashMap::new(),
		}
	} // }}}

	async fn process_request(&self, input: Packet) -> Result<Packet, Error> /* {{{ */ {
		let output = match input.payload {
			Payload::Init(_) => Payload::version(3, vec![]).into_packet(),
			Payload::Version(_) => unreachable!(),
			Payload::Open(r) => /* {{{ */ {
				let path = r.path;
				let result = self.backend.open(
					&path,
					r.pflags.contains(OpenFlags::Read),
					r.pflags.contains(OpenFlags::Write),
					r.pflags.contains(OpenFlags::Append),
					r.pflags.contains(OpenFlags::Create),
					r.pflags.contains(OpenFlags::Truncate),
					r.pflags.contains(OpenFlags::Exclude)
				).await;
				let response = match result {
					Ok(v) => {
						let response = Payload::handle(r.id);
						let mut state = self.open_files.lock().await;
						state.insert(response.handle, v);
						Payload::Handle(response)
					},
					Err(e) => {
						eprintln!("!!! Failed to open file: {:?}", e);
						Payload::status(r.id, StatusType::Failure, format!("Failed to open file: {}", e))
					}
				};
				response.into_packet()
			}, // }}}
			Payload::Close(r) => /* {{{ */ {
				let mut files = self.open_files.lock().await;
				let response = match files.remove(&r.handle) {
					Some(_) => Payload::status(r.id, StatusType::OK, "OK"),
					None => match self.open_dirs.lock().await.remove(&r.handle) {
						Some(_) => Payload::status(r.id, StatusType::OK, "OK"),
						None => Payload::status(r.id, StatusType::NoSuchFile, format!("Handle {} does not exist", &r.handle))
					}
				};
				response.into_packet()
			}, // }}}
			Payload::Read(r) => /* {{{ */ {
				let mut state = self.open_files.lock().await;
				let response = match state.get_mut(&r.handle) {
					Some(ref mut file) => {
						let mut packet = Payload::data_with_size(r.id, r.len);
						file.seek(SeekFrom::Start(r.offset)).await?;
						let count = file.read(&mut packet.data).await?;
						if(count == 0) {
							Payload::status(r.id, StatusType::EOF, "EOF")
						} else {
							packet.data.truncate(count);
							Payload::Data(packet)
						}
					},
					None => Payload::status(r.id, StatusType::NoSuchFile, "No such file")
				};
				response.into_packet()
			}, // }}}
			Payload::Write(r) => /* {{{ */ {
				let mut state = self.open_files.lock().await;
				let response = match state.get_mut(&r.handle) {
					Some(ref mut file) => {
						// TODO:  When attempting to seek past the end of the (existing), zerofill the gap
						file.seek(SeekFrom::Start(r.offset)).await?;
						let mut written = 0;
						while(written < r.data.len()) {
							let count = file.write(&r.data[written..]).await?;
							written += count;
						}
						if(written < r.data.len()) {
							Payload::status(r.id, StatusType::EOF, "EOF")
						} else {
							Payload::status(r.id, StatusType::OK, "OK")
						}
					},
					None => Payload::status(r.id, StatusType::NoSuchFile, "No such file")
				};
				response.into_packet()
			}, // }}}
			Payload::Lstat(r) => /* {{{ */ {
				// TODO:  Don't follow symlinks
				let mut attrs = Payload::attrs(r.id);
				attrs.attrs = self.backend.metadata(&r.path).await?.into();
				attrs.into_packet()
			}, // }}}
			Payload::Fstat(r) => /* {{{ */ {
				let open_files = self.open_files.lock().await;
				let response = match open_files.get(&r.handle) {
					Some(v) => {
						let mut attrs = Payload::attrs(r.id);
						attrs.attrs = v.metadata.clone().into();
						Payload::Attrs(attrs)
					},
					None => Payload::status(r.id, StatusType::NoSuchFile, "Handle not found")
				};
				response.into_packet()
			}, // }}}
			Payload::SetStat(r) => /* {{{ */ {
				let response = match self.backend.set_metadata(&r.path, r.attrs.get_uid_gid(), r.attrs.get_permissions(), r.attrs.get_atime_mtime()).await {
					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
					Err(e) => {
						eprintln!("!!! Failed to set metadata on {}:  {:?}", &r.path, e);
						Payload::status(r.id, StatusType::Failure, format!("Failed to set metadata: {}", e))
					}
				};
				response.into_packet()
			}, // }}}
			Payload::FSetStat(r) => /* {{{ */ {
				let open_files = self.open_files.lock().await;
				let response = match open_files.get(&r.handle) {
					Some(v) => match self.backend.set_metadata(&v.metadata.path, r.attrs.get_uid_gid(), r.attrs.get_permissions(), r.attrs.get_atime_mtime()).await {
						Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
						Err(e) => {
							eprintln!("!!! Failed to set metadata on {}:  {}", &v.metadata.path, e);
							Payload::status(r.id, StatusType::Failure, format!("Failed to set metadata: {}", e))
						}
					},
					None => Payload::status(r.id, StatusType::NoSuchFile, "Handle not found")
				};
				response.into_packet()
			}, // }}}
			Payload::OpenDir(r) => /* {{{ */ {
				let response = Payload::handle(r.id);
				let contents = self.backend.list(&r.path).await?;
				self.open_dirs.lock().await.insert(
					response.handle, (
						contents.into_iter().map(|f| File::new((&f).into(), f.path)).collect(),
						0
					)
				);
				response.into_packet()
			}, // }}}
			Payload::ReadDir(r) => /* {{{ */ {
				let mut state = self.open_dirs.lock().await;
				match state.get_mut(&r.handle) {
					Some((ref mut files, ref mut index)) => {
						// TODO:  I would imagine that there's a limit on the response size.  We'll end up needing to chunk.
						if(*index >= files.len()) {
							Payload::status(r.id, StatusType::EOF, "EOF").into_packet()
						} else {
							let mut payload = Payload::name(r.id);
							if(!files.is_empty()) {
								payload.files = files.clone().into();
							}
							*index += payload.files.len();
							payload.into_packet()
						}
					},
					None => Payload::status(r.id, StatusType::EOF, "EOF").into_packet()
				}
			}, // }}}
			Payload::Remove(r) => /* {{{ */ {
				let response = match self.backend.delete_file(&r.path).await {
					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to delete file: {}", e))
				};
				response.into_packet()
			}, // }}}
			Payload::MkDir(r) => /* {{{ */ {
				let response = match self.backend.mkdir(&r.path).await {
					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to create directory: {}", e))
				};
				response.into_packet()
			}, // }}}
			Payload::RmDir(r) => /* {{{ */ {
				let response = match self.backend.rmdir(&r.path).await {
					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to delete directory: {}", e))
				};
				response.into_packet()
			}, // }}}
			Payload::RealPath(r) => /* {{{ */ {
				let mut name = packet::name::Name::new(r.id);
				let normalized = self.backend.normalize_path(&r.path)?;
				name.append_file(&normalized, self.backend.metadata(&r.path).await?.into());
				name.into_packet()
			}, // }}}
			Payload::Stat(r) => /* {{{ */ {
				// TODO:  Follow symlinks
				match self.backend.metadata(&r.path).await {
					Ok(v) => {
						let mut attrs = Payload::attrs(r.id);
						attrs.attrs = v.into();
						attrs.into_packet()
					},
					Err(Error::IO(e)) if e.kind() == std::io::ErrorKind::InvalidInput => Payload::status(r.id, StatusType::NoSuchFile, "File not found").into_packet(),
					Err(e) => Payload::status(r.id, StatusType::Failure, e.to_string()).into_packet()
				}
			}, // }}}
			Payload::Rename(r) => /* {{{ */ {
				let response = match self.backend.rename(&r.oldpath, &r.newpath).await {
					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
					Err(e) => Payload::status(r.id, StatusType::Failure, format!("Failed to rename: {}", e))
				};
				response.into_packet()
			}, // }}}
			Payload::ReadLink(r) => /* {{{ */ {
				match self.backend.readlink(&r.path).await {
					Ok(Some(path)) => {
						let mut response = Payload::name(r.id);
						response.append_file(&path, self.backend.metadata(&path).await?.into());
						response.into_packet()
					},
					Ok(None) => Payload::status(r.id, StatusType::NoSuchFile, "File not found").into_packet(),
					Err(e) => Payload::status(r.id, StatusType::Failure, e.to_string()).into_packet()
				}
			}, // }}}
			Payload::Symlink(r) => /* {{{ */ {
				let response = match self.backend.mklink(&r.targetpath, &r.linkpath, LinkType::Symbolic).await {
					Ok(_) => Payload::status(r.id, StatusType::OK, "OK"),
					Err(Error::IO(e)) if e.kind() == std::io::ErrorKind::InvalidInput => Payload::status(r.id, StatusType::NoSuchFile, "File not found"),
					Err(e) => Payload::status(r.id, StatusType::Failure, e.to_string())
				};
				response.into_packet()
			}, // }}}
			Payload::Status(_) => unreachable!(),
			Payload::Handle(_) => unreachable!(),
			Payload::Data(_) => unreachable!(),
			Payload::Name(_) => unreachable!(),
			Payload::Attrs(_) => unreachable!(),
			Payload::Extended(_) => unimplemented!(),
			Payload::ExtendedReply(_) => unreachable!()
		};
		Ok(output)
	} // }}}

	async fn process_packet(&mut self, packet: Packet) -> Result<Option<Vec<u8>>, Error> /* {{{ */ {
		let se = bincode::DefaultOptions::new().with_big_endian().with_fixint_encoding();
		let response = self.process_request(packet).await?;
		let response_bytes = se.serialize(&response)?;
		Ok(Some(response_bytes))
	} // }}}

	#[cfg(not(feature = "standalone"))]
	pub async fn run(&mut self) -> Result<(), Error> /* {{{ */ {
		let mut buf = [0u8; 8192];
		let mut packet_stream = Parser::default();
		let mut stdin = tokio::io::stdin();
		let mut stdout = tokio::io::stdout();
		// TODO:  This needs a way to be terminated
		loop {
			let count = stdin.read(&mut buf).await?;
			if(count == 0) {
				tokio::time::sleep(std::time::Duration::from_millis(10)).await;
				continue;
			}
			if let Err(e) = packet_stream.write(&buf[0..count]) {
				eprintln!("!!! run():  Failed to write packet to buffer:  {:?}", e);
				continue;
			}
			let packet = match packet_stream.get_packet() {
				Ok(None) => continue,
				Ok(Some(v)) => v,
				Err(e) => {
					eprintln!("!!! run():  Failed to parse packet:  {:?}", e);
					continue;
				}
			};
			let response = match block_on(self.process_packet(packet)) {
				Ok(Some(v)) => v,
				Ok(None) => continue,
				Err(e) => {
					eprintln!("!!! run():  Failed to process packet:  {:?}", e);
					continue;
				}
			};
			stdout.write_all(&response).await?;
			stdout.flush().await?;
		}
	} // }}}
}

#[cfg(feature = "standalone")]
impl<B: Backend> thrussh::server::Server for Server<B> {
	type Handler = Self;
	fn new(&mut self, _: Option<std::net::SocketAddr>) -> Self /* {{{ */ {
		let s = self.clone();
		self.id += 1;
		s
	} // }}}
}

#[cfg(feature = "standalone")]
impl<B: Backend> Handler for Server<B> {
	type Error = Error;
	type FutureAuth = Ready<Result<(Self, Auth), Error>>;
	type FutureUnit = Ready<Result<(Self, Session), Error>>;
	type FutureBool = Ready<Result<(Self, Session, bool), Error>>;

	fn finished_auth(self, auth: Auth) -> Self::FutureAuth /* {{{ */ {
		ready(Ok((self, auth)))
	} // }}}

	fn finished_bool(self, result: bool, session: Session) -> Self::FutureBool /* {{{ */ {
		ready(Ok((self, session, result)))
	} // }}}

	fn finished(self, session: Session) -> Self::FutureUnit /* {{{ */ {
		ready(Ok((self, session)))
	} // }}}

	fn channel_open_session(self, channel: ChannelId, session: Session) -> Self::FutureUnit /* {{{ */ {
		{
			let mut clients = self.clients.lock().unwrap();
			clients.insert((self.id, channel), session.handle());
		}
		self.finished(session)
	} // }}}

	fn auth_publickey(self, _: &str, _: &thrussh_keys::key::PublicKey) -> Self::FutureAuth /* {{{ */ {
		// TODO:  Actually validate authenticaiton.
		eprintln!("auth key success");
		self.finished_auth(Auth::Accept)
	} // }}}

	fn auth_keyboard_interactive(self, user: &str, submethods: &str, response: Option<Response>) -> Self::FutureAuth /* {{{ */ {
		// TODO:  Actually validate authentication.
		eprintln!("auth_keyboard_interactive('{}', '{}', {:?})", user, submethods, response);
		eprintln!("auth int success");
		self.finished_auth(Auth::Accept)
	} // }}}

	fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit /* {{{ */ {
		let stream = self.parsers.entry(channel).or_default();
		if let Err(e) = stream.write(data) {
			return ready(Err(e.into()));
		}
		let packet = match stream.get_packet() {
			Ok(None) => return self.finished(session),
			Ok(Some(v)) => v,
			Err(e) => {
				eprintln!("!!! data():  Failed to parse packet in channel {:?}:  {:?}", channel, e);
				return ready(Err(e.into()));
			}
		};
		let response = match block_on(self.process_packet(packet)) {
			Ok(Some(v)) => v,
			Ok(None) => return self.finished(session),
			Err(e) => {
				eprintln!("!!! data():  Failed to process packet in channel {:?}:  {:?}", channel, e);
				return ready(Err(e));
			}
		};
		session.data(channel, response.into());
		self.finished(session)
	} // }}}

	fn subsystem_request(self, _channel: ChannelId, _name: &str, session: Session) -> Self::FutureUnit /* {{{ */ {
		// TODO:  We should keep state at the Server level for whether or not
		//    the SFTP subsystem has indeed been requested for a given channel
		//    ID.
		self.finished(session)
	} // }}}
}

#[derive(Debug)]
pub enum LinkType {
	Symbolic,
	Hard
}