use std::{
collections::{HashMap, hash_map},
io::Read as _,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use kak_tree_sitter_config::Config;
use mio::{
Interest, Poll, Token, Waker,
net::{UnixListener, UnixStream},
};
use crate::{
error::OhNo,
kakoune::{
buffer::BufferId,
selection::Sel,
session::{Session, SessionTracker},
},
protocol::request::{self, Metadata, Request},
server::handler::Command,
};
use super::{
fifo::Fifo,
handler::{CommandSender, Handler},
resources::ServerResources,
triple_buffer::TripleBuffer,
};
#[derive(Debug)]
pub enum Feedback {
Ok,
ShouldExit,
}
pub struct IOHandler {
is_standalone: bool,
with_highlighting: bool,
resources: ServerResources,
fifos: HashMap<Token, (Metadata, Fifo, TripleBuffer)>,
tkn_buffer_ids: HashMap<BufferId, Token>,
poll: Poll,
unix_listener: UnixListener,
connections: HashMap<Token, BufferedClient>,
command_sender: CommandSender,
}
impl IOHandler {
const WAKE_TKN: Token = Token(0);
const UNIX_LISTENER_TKN: Token = Token(1);
pub fn new(
config: &Config,
is_standalone: bool,
with_highlighting: bool,
resources: ServerResources,
poll: Poll,
) -> Result<Self, OhNo> {
let mut unix_listener = UnixListener::bind(resources.paths().socket_path())
.map_err(|err| OhNo::CannotStartServer { err })?;
let connections = HashMap::default();
let fifos = HashMap::new();
let tkn_buffer_ids = HashMap::new();
poll
.registry()
.register(
&mut unix_listener,
Self::UNIX_LISTENER_TKN,
Interest::READABLE,
)
.map_err(|err| OhNo::PollError { err })?;
let command_sender = Handler::create(config, with_highlighting);
Ok(Self {
is_standalone,
with_highlighting,
resources,
fifos,
tkn_buffer_ids,
poll,
unix_listener,
connections,
command_sender,
})
}
pub fn start(&mut self, session_tracker: &mut SessionTracker, quit: Arc<AtomicBool>) {
let mut events = mio::Events::with_capacity(64);
log::debug!("starting event loop");
'event_loop: loop {
match self.poll.poll(&mut events, None) {
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
Err(err) => {
log::error!("error while polling: {err}");
break;
}
_ => (),
}
if quit.load(Ordering::Relaxed) {
break 'event_loop;
}
for ev in &events {
match ev.token() {
Self::UNIX_LISTENER_TKN if ev.is_readable() => {
if let Err(err) = self.unix_listener_accept() {
log::error!("error while accepting UNIX connection: {err}");
}
}
tkn if ev.is_readable() => {
if let Feedback::ShouldExit = self.dispatch_read_token(session_tracker, tkn) {
break 'event_loop;
}
}
_ => (),
}
}
}
log::debug!("poll loop exited");
}
pub fn waker(&self) -> Result<Arc<Waker>, OhNo> {
let waker =
Waker::new(self.poll.registry(), Self::WAKE_TKN).map_err(|err| OhNo::PollError { err })?;
Ok(Arc::new(waker))
}
fn unix_listener_accept(&mut self) -> Result<(), OhNo> {
loop {
let (mut client, _) = match self.unix_listener.accept() {
Ok(conn) => conn,
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
Err(err) => return Err(OhNo::UnixSocketConnectionError { err }),
};
log::debug!("client connected: {client:?}");
let token = self.resources.tokens().lock().expect("tokens").create();
let res = self
.poll
.registry()
.register(&mut client, token, Interest::READABLE)
.map_err(|err| OhNo::PollError { err });
if let Err(err) = res {
self
.resources
.tokens()
.lock()
.expect("tokens")
.recycle(token);
return Err(err);
}
log::debug!("{client:?} will be using token {token:?}");
self.connections.insert(token, BufferedClient::new(client));
}
Ok(())
}
fn dispatch_read_token(
&mut self,
session_tracker: &mut SessionTracker,
token: Token,
) -> Feedback {
match self.read_unix_client(session_tracker, token) {
Ok(Some(feedback)) => return feedback,
Err(err) => {
log::error!("error while reading from UNIX client (token = {token:?}): {err}");
return Feedback::Ok;
}
_ => (),
}
if let Err(err) = self.read_buffer(token) {
log::error!("error while reading buffer: (token = {token:?}): {err}");
}
Feedback::Ok
}
fn read_unix_client(
&mut self,
session_tracker: &mut SessionTracker,
tkn: Token,
) -> Result<Option<Feedback>, OhNo> {
let Some(client) = self.connections.get_mut(&tkn) else {
return Ok(None);
};
let Some(s) = client.read()? else {
return Ok(None);
};
let req = Request::from_json(s)?;
self.process_req(session_tracker, req).map(Some)
}
pub fn process_req(
&mut self,
session_tracker: &mut SessionTracker,
req: Request,
) -> Result<Feedback, OhNo> {
log::debug!("processing request: {req:?}");
match req.payload {
request::Payload::SessionBegin => {
let session = req.session();
if session_tracker.tracks(session) {
log::warn!("session {session} already tracked");
return Ok(Feedback::Ok);
}
log::info!("registering session {}", req.session());
let session = Session::new(req.session())?;
session_tracker.track(session);
self.command_sender.send(Command::SessionInit {
metadata: req.metadata,
})?;
}
request::Payload::SessionEnd => {
log::info!("session {} exit", req.session());
self.command_sender.send(Command::SessionEnd {
metadata: req.metadata.clone(),
})?;
session_tracker.untrack(req.session());
let feedback = if !self.is_standalone && session_tracker.is_empty() {
log::info!("last session exited; stopping the server…");
Feedback::ShouldExit
} else {
Feedback::Ok
};
return Ok(feedback);
}
request::Payload::Reload => {
log::info!("reloading configuration, grammars and queries");
self.reload();
}
request::Payload::Shutdown => {
log::info!("shutting down");
self.command_sender.send(Command::Shutdown)?;
return Ok(Feedback::ShouldExit);
}
request::Payload::BufferMetadata { lang } => {
let metadata = req.metadata;
let id = metadata.to_buffer_id()?;
log::info!("buffer metadata {metadata:?} ({lang})");
let (fifo_path, sentinel) = match self.tkn_buffer_ids.entry(id.clone()) {
hash_map::Entry::Occupied(entry) => {
let tkn = *entry.get();
let (_, fifo, _) = self.fifos.get(&tkn).ok_or(OhNo::UnknownToken { tkn })?;
(fifo.path().to_owned(), fifo.sentinel().to_owned())
}
hash_map::Entry::Vacant(entry) => {
let fifo = self.resources.new_fifo()?;
let tkn = fifo.token();
let ret = (fifo.path().to_owned(), fifo.sentinel().to_owned());
entry.insert(tkn);
self
.fifos
.insert(tkn, (metadata.clone(), fifo, TripleBuffer::new()));
ret
}
};
self.command_sender.send(Command::BufferMetadata {
metadata,
lang,
fifo_path,
sentinel,
})?;
}
request::Payload::BufferClose => {
let metadata = req.metadata;
let id = metadata.to_buffer_id()?;
log::info!("buffer close {metadata:?}");
if let Some(tkn) = self.tkn_buffer_ids.remove(&id) {
self.fifos.remove(&tkn);
}
self
.command_sender
.send(Command::BufferClose { metadata })?;
}
request::Payload::TextObjects {
pattern,
selections,
mode,
} => {
let metadata = req.metadata;
let selections = Sel::parse_many(&selections);
log::info!(
"text objects for {metadata:?}, pattern {pattern}, mode {mode:?}, selections: {selections:?}"
);
self.command_sender.send(Command::TextObjects {
metadata,
pattern,
selections,
mode,
})?;
}
request::Payload::Nav { selections, dir } => {
let metadata = req.metadata;
log::info!("nav for buffer {metadata:?}, dir {dir:?}",);
let selections = Sel::parse_many(&selections);
self.command_sender.send(Command::Nav {
metadata,
selections,
dir,
})?;
}
request::Payload::Version => self.command_sender.send(Command::Version {
metadata: req.metadata,
})?,
}
Ok(Feedback::Ok)
}
fn read_buffer(&mut self, tkn: Token) -> Result<(), OhNo> {
let Some((metadata, fifo, triple_buffer)) = self.fifos.get_mut(&tkn) else {
return Err(OhNo::UnknownToken { tkn });
};
let Some(ready_fifo) = fifo.read()?.ready() else {
return Ok(());
};
triple_buffer.writer.write(ready_fifo.as_str());
fifo.clear();
self.command_sender.send(Command::BufferUpdate {
metadata: metadata.clone(),
reader: triple_buffer.reader.clone(),
})?;
Ok(())
}
fn reload(&mut self) {
let config = match Config::load_from_xdg() {
Ok(config) => config,
Err(err) => {
log::error!("reloading config failed: {err}");
return;
}
};
self.command_sender = Handler::create(&config, self.with_highlighting);
}
}
pub struct BufferedClient {
client: UnixStream,
buf: String,
}
impl BufferedClient {
pub fn new(client: UnixStream) -> Self {
Self {
client,
buf: String::default(),
}
}
pub fn read(&mut self) -> Result<Option<&str>, OhNo> {
loop {
match self.client.read_to_string(&mut self.buf) {
Ok(0) => return Ok(Some(self.buf.as_str())),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => return Ok(None),
Err(err) => return Err(OhNo::UnixSocketReadError { err }),
_ => continue,
}
}
}
}