use std::{
any::{Any, TypeId},
ffi::OsString,
sync::OnceLock,
thread,
time::Duration
};
use hashbrown::HashMap;
use parking_lot::Mutex;
use tokio::sync::{
broadcast,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
oneshot
};
use windows_service::{
define_windows_service,
service::{
ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState,
ServiceStatus, ServiceType
},
service_control_handler::{
self, ServiceControlHandlerResult, ServiceStatusHandle
},
service_dispatcher
};
use windows_registry::LOCAL_MACHINE;
#[cfg(feature = "wait-for-debugger")]
use dbgtools_win::debugger;
use crate::{
err::{CbErr, Error},
lumberjack::LumberJack,
rt::{Demise, RunEnv, SrvAppRt, SvcEvt, rttype}
};
const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS;
const SERVICE_STARTPENDING_TIME: Duration = Duration::from_secs(300);
const SERVICE_STOPPENDING_TIME: Duration = Duration::from_secs(30);
enum ToSvcMsg {
Starting(u32),
Started,
Stopping(u32),
Stopped
}
pub(crate) struct Xfer {
svcname: String,
tx_fromsvc: oneshot::Sender<Result<HandshakeMsg, Error>>,
passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
}
static CELL: OnceLock<Mutex<Option<Xfer>>> = OnceLock::new();
struct HandshakeMsg {
tx: UnboundedSender<ToSvcMsg>,
tx_svcevt: broadcast::Sender<SvcEvt>,
rx_svcevt: broadcast::Receiver<SvcEvt>,
passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
}
pub struct ServiceReporter {
tx: UnboundedSender<ToSvcMsg>
}
impl super::StateReporter for ServiceReporter {
fn starting(&self, checkpoint: u32, _status: &str) {
if let Err(e) = self.tx.send(ToSvcMsg::Starting(checkpoint)) {
log::error!("Unable to send Starting message; {e}");
}
}
fn started(&self) {
if let Err(e) = self.tx.send(ToSvcMsg::Started) {
log::error!("Unable to send Started message; {e}");
}
}
fn stopping(&self, checkpoint: u32, _status: &str) {
if let Err(e) = self.tx.send(ToSvcMsg::Stopping(checkpoint)) {
log::error!("Unable to send Stopping message; {e}");
}
}
fn stopped(&self) {
if let Err(e) = self.tx.send(ToSvcMsg::Stopped) {
log::error!("Unable to send Stopped message; {e}");
}
}
}
#[allow(clippy::missing_panics_doc)]
pub fn run<ApEr>(
svcname: &str,
st: SrvAppRt<ApEr>,
passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
) -> Result<(), CbErr<ApEr>>
where
ApEr: Send + 'static + std::fmt::Debug
{
#[cfg(feature = "wait-for-debugger")]
{
debugger::wait_for_then_break();
debugger::output("Hello, debugger");
}
let (tx_fromsvc, rx_fromsvc) = oneshot::channel();
let xfer = Xfer {
svcname: svcname.into(),
tx_fromsvc,
passthrough_init,
passthrough_term
};
CELL.get_or_init(|| Mutex::new(Some(xfer)));
let svcnm = svcname.to_string();
let jh = thread::Builder::new()
.name("svcapp".into())
.spawn(move || srvapp_thread(st, svcnm, rx_fromsvc))?;
service_dispatcher::start(svcname, ffi_service_main)?;
match jh.join() {
Ok(res) => {
tracing::trace!("srvapp_thread::join res={res:?}");
match res {
Ok(()) => Ok(()),
Err(be) => Err(be)
}
}
Err(e) => {
tracing::error!("srvapp_thread() could not be joined; {e:?}");
let msg = format!("Unable to join srvapp_thread(); {e:?}");
Err(CbErr::Lib(Error::Internal(msg)))
}
}
}
fn srvapp_thread<ApEr>(
st: SrvAppRt<ApEr>,
svcname: String,
rx_fromsvc: oneshot::Receiver<Result<HandshakeMsg, Error>>
) -> Result<(), CbErr<ApEr>>
where
ApEr: Send + std::fmt::Debug
{
let Ok(res) = rx_fromsvc.blocking_recv() else {
panic!("Unable to receive handshake");
};
let Ok(HandshakeMsg {
tx,
tx_svcevt,
rx_svcevt,
passthrough_init,
passthrough_term
}) = res
else {
panic!("Unable to receive handshake");
};
let sr = ServiceReporter { tx };
let sr = super::ServiceReporter::new(sr);
let re = RunEnv::Service(Some(svcname));
match st {
SrvAppRt::Sync {
svcevt_handler,
rt_handler
} => rttype::sync_main(rttype::SyncMainParams {
re,
svcevt_handler,
rt_handler,
sr,
svcevt_ch: Some((tx_svcevt, rx_svcevt)),
passthrough_init,
passthrough_term,
test_mode: false
}),
#[cfg(feature = "tokio")]
SrvAppRt::Tokio {
rtbldr,
svcevt_handler,
rt_handler
} => rttype::tokio_main(
rtbldr,
rttype::TokioMainParams {
re,
svcevt_handler,
rt_handler,
sr,
svcevt_ch: Some((tx_svcevt, rx_svcevt)),
passthrough_init,
passthrough_term
}
),
#[cfg(feature = "rocket")]
SrvAppRt::Rocket {
svcevt_handler,
rt_handler
} => rttype::rocket_main(rttype::RocketMainParams {
re,
svcevt_handler,
rt_handler,
sr,
svcevt_ch: Some((tx_svcevt, rx_svcevt)),
passthrough_init,
passthrough_term
})
}
}
define_windows_service!(ffi_service_main, my_service_main);
fn take_shared_buffer() -> Xfer {
let Some(x) = CELL.get() else {
panic!("Unable to get shared buffer");
};
x.lock().take().unwrap()
}
struct InitRes {
handshake_reply: HandshakeMsg,
rx_tosvc: UnboundedReceiver<ToSvcMsg>,
status_handle: ServiceStatusHandle
}
#[allow(clippy::needless_pass_by_value)]
fn my_service_main(_arguments: Vec<OsString>) {
let Xfer {
svcname,
tx_fromsvc,
passthrough_init,
passthrough_term
} = take_shared_buffer();
match svcinit(&svcname, passthrough_init, passthrough_term) {
Ok(InitRes {
handshake_reply,
rx_tosvc,
status_handle
}) => {
if tx_fromsvc.send(Ok(handshake_reply)).is_err() {
log::error!("Unable to send handshake message");
return;
}
svcloop(rx_tosvc, status_handle);
}
Err(e) => {
if tx_fromsvc.send(Err(e)).is_err() {
log::error!("Unable to send handshake message");
}
}
}
}
fn svcinit(
svcname: &str,
passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
) -> Result<InitRes, Error> {
LumberJack::from_winsvc(svcname)?.service().init()?;
if let Ok(svcparams) = get_service_params_subkey(svcname)
&& let Ok(wd) = svcparams.get_string("WorkDir")
{
std::env::set_current_dir(wd).map_err(|e| {
Error::internal(format!("Unable to switch to WorkDir; {e}"))
})?;
}
let (tx_tosvc, rx_tosvc) = unbounded_channel();
let (tx_svcevt, rx_svcevt) = broadcast::channel(16);
let tx_svcevt2 = tx_svcevt.clone();
let event_handler = move |control_event| -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Interrogate => {
tracing::debug!("svc signal recieved: interrogate");
ServiceControlHandlerResult::NoError
}
ServiceControl::Stop => {
tracing::debug!("svc signal recieved: stop");
if let Err(e) = tx_svcevt2.send(SvcEvt::Shutdown(Demise::Terminated)) {
log::error!("Unable to send SvcEvt::Shutdown from winsvc; {e}");
}
ServiceControlHandlerResult::NoError
}
ServiceControl::Continue => {
tracing::debug!("svc signal recieved: continue");
ServiceControlHandlerResult::NotImplemented
}
ServiceControl::Pause => {
tracing::debug!("svc signal recieved: pause");
ServiceControlHandlerResult::NotImplemented
}
_ => {
tracing::debug!("svc signal recieved: other");
ServiceControlHandlerResult::NotImplemented
}
}
};
let status_handle =
service_control_handler::register(svcname, event_handler)?;
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::StartPending,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: SERVICE_STARTPENDING_TIME,
process_id: None
}) {
log::error!("Unable to set the sevice status to 'start pending 0'; {e}");
Err(e)?;
}
Ok(InitRes {
handshake_reply: HandshakeMsg {
tx: tx_tosvc,
tx_svcevt,
rx_svcevt,
passthrough_init,
passthrough_term
},
rx_tosvc,
status_handle
})
}
fn svcloop(
mut rx_tosvc: UnboundedReceiver<ToSvcMsg>,
status_handle: ServiceStatusHandle
) {
tracing::trace!("enter app state monitoring loop");
loop {
let Some(ev) = rx_tosvc.blocking_recv() else {
log::error!("Sender endpoints unexpectedly disappeared");
break;
};
match ev {
ToSvcMsg::Starting(checkpoint) => {
tracing::debug!("app reported that it is running");
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::StartPending,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint,
wait_hint: SERVICE_STARTPENDING_TIME,
process_id: None
}) {
log::error!(
"Unable to set service status to 'start pending {checkpoint}'; \
{e}"
);
}
}
ToSvcMsg::Started => {
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::Running,
controls_accepted: ServiceControlAccept::STOP,
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None
}) {
log::error!("Unable to set service status to 'started'; {e}");
}
}
ToSvcMsg::Stopping(checkpoint) => {
tracing::debug!("app is shutting down");
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::StopPending,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint,
wait_hint: SERVICE_STOPPENDING_TIME,
process_id: None
}) {
log::error!(
"Unable to set service status to 'stop pending {checkpoint}'; {e}"
);
}
}
ToSvcMsg::Stopped => {
if let Err(e) = status_handle.set_service_status(ServiceStatus {
service_type: SERVICE_TYPE,
current_state: ServiceState::Stopped,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None
}) {
log::error!("Unable to set service status to 'stopped'; {e}");
}
break;
}
}
}
tracing::trace!("service terminated");
}
const SVCPATH: &str = "SYSTEM\\CurrentControlSet\\Services";
const PARAMS: &str = "Parameters";
pub fn read_service_subkey(
service_name: &str
) -> Result<windows_registry::Key, Error> {
let key = LOCAL_MACHINE.open(SVCPATH)?.open(service_name)?;
Ok(key)
}
pub fn write_service_subkey(
service_name: &str
) -> Result<windows_registry::Key, Error> {
let key = LOCAL_MACHINE.open(SVCPATH)?.create(service_name)?;
Ok(key)
}
pub fn create_service_params(
service_name: &str
) -> Result<windows_registry::Key, Error> {
let key = LOCAL_MACHINE
.open(SVCPATH)?
.open(service_name)?
.create(PARAMS)?;
Ok(key)
}
pub fn get_service_params_subkey(
service_name: &str
) -> Result<windows_registry::Key, Error> {
let key = LOCAL_MACHINE
.open(SVCPATH)?
.open(service_name)?
.create(PARAMS)?;
Ok(key)
}
pub fn get_service_param(
service_name: &str
) -> Result<windows_registry::Key, Error> {
let key = LOCAL_MACHINE
.open(SVCPATH)?
.open(service_name)?
.open(PARAMS)?;
Ok(key)
}