use super::{
action::TcpServerAction,
state::{Listener, PollRequest, RecvRequest, SendRequest, TcpServerState},
};
use crate::automaton::{
Dispatcher, ModelState, PureModel, RegisterModel, RunnerBuilder, State, Uid,
};
use crate::{
callback,
models::pure::net::tcp::{
action::{Event, ListenerEvent, TcpAction, TcpPollEvents},
state::TcpState,
},
};
use log::warn;
impl RegisterModel for TcpServerState {
fn register<Substate: ModelState>(builder: RunnerBuilder<Substate>) -> RunnerBuilder<Substate> {
builder.register::<TcpState>().model_pure::<Self>()
}
}
impl PureModel for TcpServerState {
type Action = TcpServerAction;
fn process_pure<Substate: ModelState>(
state: &mut State<Substate>,
action: Self::Action,
dispatcher: &mut Dispatcher,
) {
match action {
TcpServerAction::New {
address,
listener,
max_connections,
on_success,
on_error,
on_new_connection,
on_connection_closed,
on_listener_closed,
} => {
state.substate_mut::<TcpServerState>().new_listener(
listener,
max_connections,
on_success,
on_error,
on_new_connection,
on_connection_closed,
on_listener_closed,
);
dispatcher.dispatch(TcpAction::Listen {
listener,
address,
on_success: callback!(|listener: Uid| TcpServerAction::NewSuccess { listener }),
on_error: callback!(|(listener: Uid, error: String)| TcpServerAction::NewError { listener, error })
});
}
TcpServerAction::NewSuccess { listener } => {
let Listener { on_success, .. } =
state.substate::<TcpServerState>().get_listener(&listener);
dispatcher.dispatch_back(on_success, listener);
}
TcpServerAction::NewError { listener, error } => {
let server_state: &mut TcpServerState = state.substate_mut();
let Listener { on_error, .. } = server_state.get_listener(&listener);
dispatcher.dispatch_back(on_error, (listener, error));
server_state.remove_listener(&listener);
}
TcpServerAction::Poll {
uid,
timeout,
on_success,
on_error,
} => {
let server_state: &mut TcpServerState = state.substate_mut();
let objects = server_state.listeners.keys().cloned().collect();
server_state.set_poll_request(PollRequest {
on_success,
on_error,
});
dispatcher.dispatch(TcpAction::Poll {
uid,
objects,
timeout,
on_success: callback!(|(uid: Uid, events: TcpPollEvents)| TcpServerAction::PollSuccess { uid, events } ),
on_error: callback!(|(uid: Uid, error: String)| TcpServerAction::PollError { uid, error } ),
})
}
TcpServerAction::PollSuccess { uid, events } => {
let PollRequest { on_success, .. } =
state.substate_mut::<TcpServerState>().take_poll_request();
process_poll_events(state, dispatcher, events);
dispatcher.dispatch_back(&on_success, uid)
}
TcpServerAction::PollError { uid, error } => {
let PollRequest { on_error, .. } =
state.substate_mut::<TcpServerState>().take_poll_request();
dispatcher.dispatch_back(&on_error, (uid, error))
}
TcpServerAction::AcceptSuccess { connection } => {
let (
listener,
Listener {
max_connections,
on_new_connection,
connections,
..
},
) = state
.substate_mut::<TcpServerState>()
.get_connection_listener_mut(&connection);
if connections.len() > *max_connections {
dispatcher.dispatch(TcpAction::Close {
connection,
on_success: callback!(|connection: Uid| {
TcpServerAction::CloseEventInternal { connection }
}),
})
} else {
dispatcher.dispatch_back(on_new_connection, (*listener, connection))
}
}
TcpServerAction::AcceptTryAgain { connection } => {
let (_, listener_object) = state
.substate_mut::<TcpServerState>()
.get_connection_listener_mut(&connection);
listener_object.remove_connection(&connection)
}
TcpServerAction::AcceptError { connection, error } => {
let (_, listener_object) = state
.substate_mut::<TcpServerState>()
.get_connection_listener_mut(&connection);
warn!("|TCP_SERVER| accept {:?} failed: {:?}", connection, error);
listener_object.remove_connection(&connection)
}
TcpServerAction::Close { connection } => dispatcher.dispatch(TcpAction::Close {
connection,
on_success: callback!(|connection: Uid| TcpServerAction::CloseEventNotify {
connection
}),
}),
TcpServerAction::CloseEventInternal { connection } => {
let (_, listener_object) = state
.substate_mut::<TcpServerState>()
.get_connection_listener_mut(&connection);
listener_object.remove_connection(&connection)
}
TcpServerAction::CloseEventNotify { connection } => {
let (listener, listener_object) = state
.substate_mut::<TcpServerState>()
.get_connection_listener_mut(&connection);
dispatcher.dispatch_back(
&listener_object.on_connection_closed,
(*listener, connection),
);
listener_object.remove_connection(&connection)
}
TcpServerAction::Send {
uid,
connection,
data,
timeout,
on_success,
on_timeout,
on_error,
} => {
state
.substate_mut::<TcpServerState>()
.new_send_request(&uid, connection, on_success, on_timeout, on_error);
dispatcher.dispatch(TcpAction::Send {
uid,
connection,
data,
timeout,
on_success: callback!(|uid: Uid| TcpServerAction::SendSuccess { uid }),
on_timeout: callback!(|uid: Uid| TcpServerAction::SendTimeout { uid }),
on_error: callback!(|(uid: Uid, error: String)| TcpServerAction::SendError { uid, error }),
});
}
TcpServerAction::SendSuccess { uid } => {
let SendRequest { on_success, .. } = state
.substate_mut::<TcpServerState>()
.take_send_request(&uid);
dispatcher.dispatch_back(&on_success, uid)
}
TcpServerAction::SendTimeout { uid } => {
let SendRequest { on_timeout, .. } = state
.substate_mut::<TcpServerState>()
.take_send_request(&uid);
dispatcher.dispatch_back(&on_timeout, uid)
}
TcpServerAction::SendError { uid, error } => {
let SendRequest {
connection,
on_error,
..
} = state
.substate_mut::<TcpServerState>()
.take_send_request(&uid);
dispatcher.dispatch_back(&on_error, (uid, error));
dispatcher.dispatch(TcpAction::Close {
connection,
on_success: callback!(|connection: Uid| TcpServerAction::CloseEventNotify {
connection
}),
});
}
TcpServerAction::Recv {
uid,
connection,
count,
timeout,
on_success,
on_timeout,
on_error,
} => {
state
.substate_mut::<TcpServerState>()
.new_recv_request(&uid, connection, on_success, on_timeout, on_error);
dispatcher.dispatch(TcpAction::Recv {
uid,
connection,
count,
timeout,
on_success: callback!(|(uid: Uid, data: Vec<u8>)| TcpServerAction::RecvSuccess { uid, data }),
on_timeout: callback!(|(uid: Uid, partial_data: Vec<u8>)| TcpServerAction::RecvTimeout { uid, partial_data }),
on_error: callback!(|(uid: Uid, error: String)| TcpServerAction::RecvError { uid, error }),
});
}
TcpServerAction::RecvSuccess { uid, data } => {
let RecvRequest { on_success, .. } = state
.substate_mut::<TcpServerState>()
.take_recv_request(&uid);
dispatcher.dispatch_back(&on_success, (uid, data))
}
TcpServerAction::RecvTimeout { uid, partial_data } => {
let RecvRequest { on_timeout, .. } = state
.substate_mut::<TcpServerState>()
.take_recv_request(&uid);
dispatcher.dispatch_back(&on_timeout, (uid, partial_data))
}
TcpServerAction::RecvError { uid, error } => {
let RecvRequest {
connection,
on_error,
..
} = state
.substate_mut::<TcpServerState>()
.take_recv_request(&uid);
dispatcher.dispatch_back(&on_error, (uid, error));
dispatcher.dispatch(TcpAction::Close {
connection,
on_success: callback!(|connection: Uid| TcpServerAction::CloseEventNotify {
connection
}),
})
}
}
}
}
fn process_poll_events<Substate: ModelState>(
state: &mut State<Substate>,
dispatcher: &mut Dispatcher,
events: TcpPollEvents,
) {
for (listener, ev) in events {
if let Event::Listener(event) = ev {
match event {
ListenerEvent::AcceptPending => {
let connection = state.new_uid();
state
.substate_mut::<TcpServerState>()
.new_connection(connection, listener);
dispatcher.dispatch(TcpAction::Accept {
connection,
listener,
on_success: callback!(|connection: Uid| TcpServerAction::AcceptSuccess { connection }),
on_would_block: callback!(|connection: Uid| TcpServerAction::AcceptTryAgain { connection }),
on_error: callback!(|(connection: Uid, error: String)| TcpServerAction::AcceptError { connection, error }),
});
}
ListenerEvent::AllAccepted => (),
ListenerEvent::Closed | ListenerEvent::Error => {
let Listener {
on_listener_closed, ..
} = state
.substate_mut::<TcpServerState>()
.remove_listener(&listener);
dispatcher.dispatch_back(&on_listener_closed, listener)
}
}
} else {
unreachable!()
}
}
}