#include "protoDispatcher.h"
#include <stdio.h>
#include <string.h>
#ifdef WIN32
#include <TCHAR.h>
#ifndef _WIN32_WCE
#include <process.h>
#endif const ProtoDispatcher::Descriptor ProtoDispatcher::INVALID_DESCRIPTOR = INVALID_HANDLE_VALUE;
const ProtoDispatcher::WaitStatus ProtoDispatcher::WAIT_ERROR = WAIT_FAILED;
#else
#include <unistd.h>
#include <sys/time.h>
#include <sys/select.h>
#include <fcntl.h>
#ifdef HAVE_SCHED
#include <sched.h>
#else
#include <sys/resource.h>
#endif const ProtoDispatcher::Descriptor ProtoDispatcher::INVALID_DESCRIPTOR = -1;
const ProtoDispatcher::WaitStatus ProtoDispatcher::WAIT_ERROR = -1;
#endif
ProtoDispatcher::Stream::Stream(Type theType)
: type(theType)
#ifdef WIN32
,index(-1), outdex(-1)
#endif {
}
ProtoDispatcher::SocketStream::SocketStream(ProtoSocket& theSocket)
: Stream(SOCKET), socket(&theSocket)
{
}
ProtoDispatcher::ChannelStream::ChannelStream(ProtoChannel& theChannel)
: Stream(CHANNEL), channel(&theChannel)
{
}
ProtoDispatcher::TimerStream::TimerStream()
: Stream(TIMER), descriptor(INVALID_DESCRIPTOR)
{
}
ProtoDispatcher::TimerStream::~TimerStream()
{
#ifdef UNIX
if (INVALID_DESCRIPTOR != descriptor)
{
close(descriptor);
descriptor = INVALID_DESCRIPTOR;
}
#endif }
ProtoDispatcher::EventStream::EventStream(ProtoEvent& theEvent)
: Stream(EVENT), event(&theEvent)
{
}
ProtoDispatcher::EventStream::~EventStream()
{
}
ProtoDispatcher::GenericStream::GenericStream(Descriptor theDescriptor)
: Stream(GENERIC), myself(this), descriptor(theDescriptor), callback(NULL), client_data(NULL)
{
}
ProtoDispatcher::ProtoDispatcher()
: run(false), wait_status(WAIT_ERROR), exit_code(0), timer_delay(-1), precise_timing(false),
thread_id((ThreadId)(NULL)), external_thread(false), priority_boost(false),
thread_started(false), thread_signaled(false), thread_master((ThreadId)(NULL)),
suspend_count(0), signal_count(0), controller(NULL),
prompt_set(false), prompt_callback(NULL), prompt_client_data(NULL),
break_stream(break_event)
#ifdef WIN32
,stream_handles_array(NULL), stream_ptrs_array(NULL),
stream_array_size(0), stream_count(0), msg_window(NULL),
actual_thread_handle(NULL)
#ifdef USE_WAITABLE_TIMER
,timer_active(false)
#endif #else
#ifdef USE_TIMERFD
,timer_fd(-1)
#endif #if defined(USE_SELECT)
#elif defined(USE_KQUEUE)
,kevent_queue(-1)
#elif defined(USE_EPOLL)
,epoll_fd(-1)
#else
#error "undefined async i/o mechanism"
#endif #endif {
}
ProtoDispatcher::~ProtoDispatcher()
{
Destroy();
}
void ProtoDispatcher::Destroy()
{
Stop();
Stream* stream;
StreamTable::Iterator iterator(stream_table);
while (NULL != (stream = iterator.GetNextItem()))
{
switch (stream->GetType())
{
case Stream::CHANNEL:
static_cast<ChannelStream*>(stream)->GetChannel().SetNotifier(NULL);
break;
case Stream::SOCKET:
static_cast<SocketStream*>(stream)->GetSocket().SetNotifier(NULL);
break;
case Stream::GENERIC:
ReleaseGenericStream(static_cast<GenericStream&>(*stream));
break;
case Stream::TIMER:
break;
case Stream::EVENT:
ReleaseEventStream(static_cast<EventStream&>(*stream));
break;
}
}
ASSERT(stream_table.IsEmpty());
channel_stream_pool.Destroy();
socket_stream_pool.Destroy();
generic_stream_pool.Destroy();
#ifdef WIN32
if (NULL != stream_handles_array)
{
delete[] stream_handles_array;
delete[] stream_ptrs_array;
stream_array_size = 0;
}
stream_count = 0;
Win32Cleanup();
#endif }
bool ProtoDispatcher::UpdateChannelNotification(ProtoChannel& theChannel,
int notifyFlags)
{
SignalThread();
ChannelStream* channelStream = GetChannelStream(theChannel);
if (NULL != channelStream)
{
if ((channelStream->GetNotifyFlags() == notifyFlags) && (0 != notifyFlags))
{
ASSERT(0 != notifyFlags);
}
else if (0 != notifyFlags)
{
if (0 != (notifyFlags & ProtoChannel::NOTIFY_INPUT))
{
if (!channelStream->IsInput()) {
if (!UpdateStreamNotification(*channelStream, ENABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() error: unable to ENABLE_INPUT!\n");
UnsignalThread();
return false;
}
}
}
else
{
if (channelStream->IsInput()) {
if (!UpdateStreamNotification(*channelStream, DISABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() error: unable to DISABLE_INPUT!\n");
UnsignalThread();
return false;
}
}
}
if (0 != (notifyFlags & ProtoChannel::NOTIFY_OUTPUT))
{
if (!channelStream->IsOutput()) {
if (!UpdateStreamNotification(*channelStream, ENABLE_OUTPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() error: unable to ENABLE_OUTPUT!\n");
UnsignalThread();
return false;
}
}
}
else
{
if (channelStream->IsOutput()) {
if (!UpdateStreamNotification(*channelStream, DISABLE_OUTPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() error: unable to DISABLE_OUTPUT!\n");
UnsignalThread();
return false;
}
}
}
}
else {
if (channelStream->HasNotifyFlags())
{
if (!UpdateStreamNotification(*channelStream, DISABLE_ALL))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() error: unable to DISABLE_ALL!\n");
UnsignalThread();
return false;
}
}
ReleaseChannelStream(*channelStream);
UnsignalThread();
return true;
}
#ifdef WIN32
if ((theChannel.IsInputReady() && channelStream->IsInput()) ||
(theChannel.IsOutputReady() && channelStream->IsOutput()))
{
if (!ready_stream_list.Contains(*channelStream))
{
if (!ready_stream_list.Append(*channelStream))
{
ReleaseChannelStream(*channelStream);
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() error: unable to append ready channel!\n");
UnsignalThread();
return false;
}
}
}
else if (ready_stream_list.Contains(*channelStream))
{
ready_stream_list.Remove(*channelStream);
}
#endif
UnsignalThread();
return true;
}
else
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateChannelNotification() new ChannelStream error: %s\n",
GetErrorString());
UnsignalThread();
return false;
}
}
bool ProtoDispatcher::UpdateEventNotification(ProtoEvent& theEvent,
int notifyFlags)
{
SignalThread(); EventStream* eventStream = GetEventStream(theEvent);
if (NULL != eventStream)
{
if (eventStream->GetNotifyFlags() == notifyFlags)
{
if (0 == notifyFlags)
{
ReleaseEventStream(*eventStream);
UnsignalThread();
return true;
}
}
else if (0 != notifyFlags)
{
ASSERT(!eventStream->NotifyFlagIsSet(EventStream::NOTIFY_INPUT));
if (!UpdateStreamNotification(*eventStream, ENABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateEventNotification() error: unable to ENABLE_INPUT!\n");
UnsignalThread();
return false;
}
}
else {
if (!UpdateStreamNotification(*eventStream, DISABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateEventNotification() error: unable to DISABLE_INPUT!\n");
UnsignalThread();
return false;
}
ASSERT(0 == eventStream->GetNotifyFlags());
ReleaseEventStream(*eventStream);
}
}
else
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateEventNotification() new EventStream error: %s\n",
GetErrorString());
UnsignalThread();
return false;
} UnsignalThread();
return true;
}
bool ProtoDispatcher::UpdateSocketNotification(ProtoSocket& theSocket,
int notifyFlags)
{
SignalThread(); SocketStream* socketStream = GetSocketStream(theSocket);
if (NULL != socketStream)
{
if (socketStream->GetNotifyFlags() == notifyFlags)
{
if (0 == notifyFlags)
{
ReleaseSocketStream(*socketStream);
UnsignalThread();
return true;
}
}
else if (0 != notifyFlags)
{
#ifdef WIN32
int index = socketStream->GetIndex();
if (index < 0)
{
if ((index = Win32AddStream(*socketStream, theSocket.GetInputEventHandle())) < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error adding handle\n");
ReleaseSocketStream(*socketStream);
UnsignalThread();
return false;
}
socketStream->SetIndex(index);
}
long eventMask = 0;
if (0 != (notifyFlags & ProtoSocket::NOTIFY_INPUT))
eventMask |= (FD_READ | FD_ACCEPT | FD_CLOSE);
if (0 != (notifyFlags & ProtoSocket::NOTIFY_OUTPUT))
{
eventMask |= (FD_WRITE | FD_CONNECT | FD_CLOSE);
}
if (0 != (notifyFlags & ProtoSocket::NOTIFY_EXCEPTION))
eventMask |= FD_ADDRESS_LIST_CHANGE;
if (0 != WSAEventSelect(theSocket.GetHandle(), theSocket.GetInputEventHandle(), eventMask))
{
ReleaseSocketStream(*socketStream);
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() WSAEventSelect(0x%x) error: %s\n",
eventMask, ProtoSocket::GetErrorString());
UnsignalThread();
return false;
}
socketStream->SetNotifyFlags(notifyFlags);
#else
if (0 != (notifyFlags & ProtoSocket::NOTIFY_INPUT))
{
if (!socketStream->IsInput()) {
if (!UpdateStreamNotification(*socketStream, ENABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error: unable to ENABLE_INPUT!\n");
UnsignalThread();
return false;
}
}
}
else
{
if (socketStream->IsInput()) {
if (!UpdateStreamNotification(*socketStream, DISABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error: unable to DISABLE_INPUT!\n");
UnsignalThread();
return false;
}
}
}
if (0 != (notifyFlags & ProtoSocket::NOTIFY_OUTPUT))
{
if (!socketStream->IsOutput()) {
if (!UpdateStreamNotification(*socketStream, ENABLE_OUTPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error: unable to ENABLE_OUTPUT!\n");
UnsignalThread();
return false;
}
}
}
else
{
if (socketStream->IsOutput()) {
if (!UpdateStreamNotification(*socketStream, DISABLE_OUTPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error: unable to DISABLE_OUTPUT!\n");
UnsignalThread();
return false;
}
}
}
#endif }
else {
ASSERT(socketStream->HasNotifyFlags());
#ifdef WIN32
if (0 != WSAEventSelect(theSocket.GetHandle(), theSocket.GetInputEventHandle(), 0))
PLOG(PL_WARN, "ProtoDispatcher::UpdateSocketNotification() WSAEventSelect(0) warning: %s\n",
ProtoSocket::GetErrorString());
#endif if (!UpdateStreamNotification(*socketStream, DISABLE_ALL))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error: unable to DISABLE_ALL!\n");
UnsignalThread();
return false;
}
ReleaseSocketStream(*socketStream);
UnsignalThread();
return true;
} #ifdef WIN32
if ((theSocket.IsInputReady() && socketStream->IsInput()) ||
(theSocket.IsOutputReady() && socketStream->IsOutput()))
{
if (!ready_stream_list.Contains(*socketStream))
{
if (!ready_stream_list.Append(*socketStream))
{
ReleaseSocketStream(*socketStream);
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() error: unable to append ready socket!\n");
UnsignalThread();
return false;
}
}
}
else if (ready_stream_list.Contains(*socketStream))
{
ready_stream_list.Remove(*socketStream);
}
#endif UnsignalThread();
return true;
}
else
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateSocketNotification() new SocketStream error: %s\n",
GetErrorString());
UnsignalThread();
return false;
}
}
ProtoDispatcher::SocketStream* ProtoDispatcher::GetSocketStream(ProtoSocket& theSocket)
{
ProtoSocket* socketPtr = &theSocket;
SocketStream* socketStream =
static_cast<SocketStream*>(stream_table.Find((const char*)&socketPtr, sizeof(ProtoSocket*) << 3));
if (NULL == socketStream)
{
socketStream = socket_stream_pool.Get();
if (NULL != socketStream)
{
socketStream->ClearNotifyFlags();
socketStream->SetSocket(theSocket);
}
else
{
if (!(socketStream = new SocketStream(theSocket)))
{
PLOG(PL_ERROR, "ProtoDispatcher::GetSocketStream() new SocketStream error: %s\n", GetErrorString());
return NULL;
}
}
stream_table.Insert(*socketStream);
}
return socketStream;
}
void ProtoDispatcher::ReleaseSocketStream(SocketStream& socketStream)
{
#ifdef WIN32
if (ready_stream_list.Contains(socketStream))
ready_stream_list.Remove(socketStream);
if (socketStream.GetIndex() >= 0)
{
Win32RemoveStream(socketStream.GetIndex());
socketStream.SetIndex(-1);
}
if (socketStream.GetOutdex() >= 0)
{
Win32RemoveStream(socketStream.GetOutdex());
socketStream.SetOutdex(-1);
}
#endif if (socketStream.HasNotifyFlags())
{
if (!UpdateStreamNotification(socketStream, DISABLE_ALL))
PLOG(PL_ERROR, "ProtoDispatcher::ReleaseSocketStream() error: UpdateStreamNotification(DISABLE_ALL) failure!\n");
socketStream.ClearNotifyFlags();
}
stream_table.Remove(socketStream);
socket_stream_pool.Put(socketStream);
}
ProtoDispatcher::ChannelStream* ProtoDispatcher::GetChannelStream(ProtoChannel& theChannel)
{
ProtoChannel* channelPtr = &theChannel;
ChannelStream* channelStream =
static_cast<ChannelStream*>(stream_table.Find((const char*)&channelPtr, sizeof(ProtoChannel*) << 3));
if (NULL == channelStream)
{
channelStream = channel_stream_pool.Get();
if (NULL != channelStream)
{
channelStream->ClearNotifyFlags();
channelStream->SetChannel(theChannel);
}
else
{
if (NULL == (channelStream = new ChannelStream(theChannel)))
{
PLOG(PL_ERROR, "ProtoDispatcher::GetChannelStream() new ChannelStream error: %s\n", GetErrorString());
return NULL;
}
}
stream_table.Insert(*channelStream);
}
return channelStream;
}
void ProtoDispatcher::ReleaseChannelStream(ChannelStream& channelStream)
{
#ifdef WIN32
if (ready_stream_list.Contains(channelStream))
ready_stream_list.Remove(channelStream);
if (channelStream.GetIndex() >= 0)
{
Win32RemoveStream(channelStream.GetIndex());
channelStream.SetIndex(-1);
}
if (channelStream.GetOutdex() >= 0)
{
Win32RemoveStream(channelStream.GetOutdex());
channelStream.SetOutdex(-1);
}
#endif stream_table.Remove(channelStream);
channelStream.ClearNotifyFlags();
channel_stream_pool.Put(channelStream);
}
ProtoDispatcher::EventStream* ProtoDispatcher::GetEventStream(ProtoEvent& theEvent)
{
ProtoEvent* eventPtr = &theEvent;
EventStream* eventStream =
static_cast<EventStream*>(stream_table.Find((const char*)&eventPtr, sizeof(ProtoEvent*) << 3));
if (NULL == eventStream)
{
eventStream = event_stream_pool.Get();
if (NULL != eventStream)
{
eventStream->ClearNotifyFlags();
eventStream->SetEvent(theEvent);
}
else
{
if (NULL == (eventStream = new EventStream(theEvent)))
{
PLOG(PL_ERROR, "ProtoDispatcher::GetEventStream() new EventStream error: %s\n", GetErrorString());
return NULL;
}
}
stream_table.Insert(*eventStream);
}
return eventStream;
}
void ProtoDispatcher::ReleaseEventStream(EventStream& eventStream)
{
stream_table.Remove(eventStream);
eventStream.ClearNotifyFlags();
event_stream_pool.Put(eventStream);
}
bool ProtoDispatcher::InstallGenericInput(ProtoDispatcher::Descriptor descriptor,
ProtoDispatcher::Callback* callback,
const void* clientData)
{
SignalThread();
GenericStream* stream = GetGenericStream(descriptor);
if (NULL != stream)
{
if (!stream->IsInput()) {
if (!UpdateStreamNotification(*stream, ENABLE_INPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallGenericInput() error: unable to ENABLE_INPUT!\n");
if (!stream->HasNotifyFlags()) ReleaseGenericStream(*stream);
UnsignalThread();
return false;
}
}
}
stream->SetNotifyFlag(Stream::NOTIFY_INPUT);
stream->SetCallback(callback, clientData);
UnsignalThread();
return true;;
}
void ProtoDispatcher::RemoveGenericInput(Descriptor descriptor)
{
SignalThread();
GenericStream* stream = FindGenericStream(descriptor);
if (NULL != stream)
{
if (stream->IsInput())
{
if (!UpdateStreamNotification(*stream, DISABLE_INPUT))
PLOG(PL_ERROR, "ProtoDispatcher::RemoveGenericInput() error: UpdateStreamNotification(DISABLE_INPUT) failure!\n");
stream->UnsetNotifyFlag(Stream::NOTIFY_INPUT);
}
if (!stream->HasNotifyFlags()) ReleaseGenericStream(*stream);
}
UnsignalThread();
}
bool ProtoDispatcher::InstallGenericOutput(ProtoDispatcher::Descriptor descriptor,
ProtoDispatcher::Callback* callback,
const void* clientData)
{
SignalThread();
GenericStream* stream = FindGenericStream(descriptor);
if (NULL != stream)
{
if (!stream->IsOutput()) {
if (!UpdateStreamNotification(*stream, ENABLE_OUTPUT))
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallGenericOutput() error: unable to ENABLE_OUTPUT!\n");
if (!stream->HasNotifyFlags()) ReleaseGenericStream(*stream);
UnsignalThread();
return false;
}
}
}
stream->SetNotifyFlag(Stream::NOTIFY_OUTPUT);
stream->SetCallback(callback, clientData);
UnsignalThread();
return true;
}
void ProtoDispatcher::RemoveGenericOutput(Descriptor descriptor)
{
SignalThread();
GenericStream* stream = FindGenericStream(descriptor);
if (NULL != stream)
{
if (stream->IsOutput())
{
if (!UpdateStreamNotification(*stream, DISABLE_OUTPUT))
PLOG(PL_ERROR, "ProtoDispatcher::RemoveGenericOutput() error: UpdateStreamNotification(DISABLE_OUTPUT) failure!\n");
stream->UnsetNotifyFlag(Stream::NOTIFY_OUTPUT);
}
if (!stream->HasNotifyFlags()) ReleaseGenericStream(*stream);
}
UnsignalThread();
}
ProtoDispatcher::GenericStream* ProtoDispatcher::GetGenericStream(Descriptor descriptor)
{
GenericStream* genericStream = generic_stream_table.FindByDescriptor(descriptor);
if (NULL == genericStream)
{
genericStream = generic_stream_pool.Get();
if (NULL != genericStream)
{
genericStream->ClearNotifyFlags();
genericStream->SetDescriptor(descriptor);
}
else
{
if (NULL == (genericStream = new GenericStream(descriptor)))
{
PLOG(PL_ERROR, "ProtoDispatcher::GetGenericStream() new GenericStream error: %s\n", GetErrorString());
return NULL;
}
}
stream_table.Insert(*genericStream);
if (!generic_stream_table.Insert(*genericStream))
{
PLOG(PL_ERROR, "ProtoDispatcher::GetGenericStream() error: unable to add to table: %s\n", GetErrorString());
ReleaseGenericStream(*genericStream);
return NULL;
}
}
return genericStream;
}
void ProtoDispatcher::ReleaseGenericStream(GenericStream& genericStream)
{
if (genericStream.IsInput())
{
if (!UpdateStreamNotification(genericStream, DISABLE_INPUT))
PLOG(PL_ERROR, "ProtoDispatcher::ReleaseGenericStream() error: UpdateStreamNotification(DISABLE_INPUT) failure!\n");
}
if (genericStream.IsOutput())
{
if (!UpdateStreamNotification(genericStream, DISABLE_OUTPUT))
PLOG(PL_ERROR, "ProtoDispatcher::ReleaseGenericStream() error: UpdateStreamNotification(DISABLE_OUTPUT) failure!\n");
}
genericStream.ClearNotifyFlags();
stream_table.Remove(genericStream);
generic_stream_table.Remove(genericStream);
generic_stream_pool.Put(genericStream);
}
bool ProtoDispatcher::BoostPriority()
{
#ifdef WINCE
PLOG(PL_ERROR, "ProtoDispatcher::BoostPriority() not supported on WinCE");
return false;
#elif defined(WIN32)
ThreadId threadId = IsThreaded() ? thread_id : ::GetCurrentThreadId();
HANDLE threadHandle = OpenThread(THREAD_SET_INFORMATION, FALSE, threadId);
if (NULL == threadHandle)
{
PLOG(PL_ERROR, "ProtoDispatcher::BoostPriority() error: OpenThread() error: %s\n", GetErrorString());
return false;
}
if (!SetThreadPriority(threadHandle, THREAD_PRIORITY_TIME_CRITICAL))
{
PLOG(PL_ERROR, "ProtoDispatcher::BoostPriority() error: SetThreadPriority() error: %s\n", GetErrorString());
CloseHandle(threadHandle);
return false;
}
CloseHandle(threadHandle);
#else
#ifdef HAVE_SCHED
struct sched_param schp;
memset(&schp, 0, sizeof(schp));
schp.sched_priority = sched_get_priority_max(SCHED_FIFO);
if (0 != sched_setscheduler(0, SCHED_FIFO, &schp))
{
schp.sched_priority = sched_get_priority_max(SCHED_OTHER);
if (0 != sched_setscheduler(0, SCHED_OTHER, &schp))
{
PLOG(PL_ERROR, "ProtoDispatcher::BoostPriority() error: sched_setscheduler() error: %s\n", GetErrorString());
return false;
}
else
{
PLOG(PL_WARN, "ProtoDispatcher::BoostPriority() warning: unable to set SCHED_FIFO boost, SCHED_OTHER set.\n"
" (run as root or sudofor full SCHED_FIFO priority boost)\n");
}
}
#else
if (0 != setpriority(PRIO_PROCESS, getpid(), -20))
{
PLOG(PL_ERROR, "ProtoDispatcher::BoostPriority() error: setpriority() error: %s\n", GetErrorString());
return false;
}
#endif #endif return true;
}
int ProtoDispatcher::Run(bool oneShot)
{
exit_code = 0;
wait_status = WAIT_ERROR;
if (priority_boost) BoostPriority();
#ifdef USE_TIMERFD
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (tfd < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::Run() timerfd_create() error: %s\n", GetErrorString());
return -1; }
#ifdef USE_EPOLL
if (!EpollChange(tfd, EPOLLIN, EPOLL_CTL_ADD, &timer_stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::Run() error EpolLChange(timer_fd) failed!\n");
close(tfd);
return -1; }
#endif
timer_stream.SetDescriptor(tfd);
#endif
#ifdef USE_WAITABLE_TIMER
HANDLE wtm = CreateWaitableTimer(NULL, TRUE, NULL);
if (INVALID_HANDLE_VALUE == wtm)
{
PLOG(PL_ERROR, "ProtoDispatcher::Run() CreateWaitableTimer() error: %s\n", GetErrorString());
return -1;
}
timer_stream.SetDescriptor(wtm);
timer_stream.SetIndex(-1); timer_active = false;
#endif
run = oneShot ? false : true;
do
{
if (IsPending())
{
timer_delay = ProtoTimerMgr::GetTimeRemaining();
if (IsThreaded())
{
Lock(signal_mutex);
Unlock(suspend_mutex);
Wait();
Unlock(signal_mutex);
Lock(suspend_mutex);
if (prompt_set)
{
if (NULL != prompt_callback)
prompt_callback(prompt_client_data);
prompt_set = false;
}
if (NULL != controller)
{
Unlock(suspend_mutex);
controller->DoDispatch();
Lock(suspend_mutex);
}
else
{
Dispatch();
}
}
else
{
Wait();
Dispatch();
}
}
else
{
PLOG(PL_DEBUG, "ProtoDispatcher::Run() would be stuck with infinite timeout & no inputs!\n");
break;
}
} while (run);
#ifdef USE_TIMERFD
close(timer_stream.GetDescriptor());
timer_stream.SetDescriptor(INVALID_DESCRIPTOR);
#endif #ifdef USE_WAITABLE_TIMER
if (timer_active)
{
CancelWaitableTimer(timer_stream.GetDescriptor());
timer_active = false;
}
if (timer_stream.GetIndex() >= 0)
{
Win32RemoveStream(timer_stream.GetIndex());
timer_stream.SetIndex(-1);
}
CloseHandle(timer_stream.GetDescriptor());
timer_stream.SetDescriptor(INVALID_DESCRIPTOR);
#endif return exit_code;
}
void ProtoDispatcher::Stop(int exitCode)
{
if (controller)
{
controller->OnThreadStop();
controller = NULL;
}
SignalThread();
exit_code = run ? exitCode : exit_code;
run = false;
#ifdef WIN32
if (msg_window)
PostMessage(msg_window, WM_DESTROY, 0, 0);
#endif UnsignalThread();
DestroyThread();
}
#ifdef WIN32
#ifdef _WIN32_WCE
DWORD WINAPI ProtoDispatcher::DoThreadStart(LPVOID param)
#else
unsigned int __stdcall ProtoDispatcher::DoThreadStart(void* param)
#endif #else
void* ProtoDispatcher::DoThreadStart(void* param)
#endif {
ProtoDispatcher* dp = reinterpret_cast<ProtoDispatcher*>(param);
ASSERT(NULL != dp);
if (NULL != dp->controller) Lock(dp->controller->lock_b);
Lock(dp->suspend_mutex);
dp->thread_started = true;
dp->exit_status = dp->Run(); Unlock(dp->suspend_mutex);
DoThreadExit(dp->GetExitStatus());
return (dp->GetExitStatus());
}
bool ProtoDispatcher::StartThread(bool priorityBoost,
ProtoDispatcher::Controller* theController,
ThreadId threadId)
{
if (IsThreaded())
{
PLOG(PL_ERROR, "ProtoDispatcher::StartThread() error: thread already started\n");
return false;
}
priority_boost = priorityBoost;
if (!InstallBreak())
{
PLOG(PL_ERROR, "ProtoDispatcher::StartThread() error: InstallBreak() failed\n");
return false;
}
controller = theController;
Init(suspend_mutex);
Init(signal_mutex);
Lock(suspend_mutex);
if ((ThreadId)NULL == threadId)
{
#ifdef WIN32
#ifdef _WIN32_WCE
if (!(actual_thread_handle = CreateThread(NULL, 0, DoThreadStart, this, 0, &thread_id)))
#else
if (!(actual_thread_handle = (HANDLE)_beginthreadex(NULL, 0, DoThreadStart, this, 0, (unsigned*)&thread_id)))
#endif #else
if (0 != pthread_create(&thread_id, NULL, DoThreadStart, this))
#endif {
PLOG(PL_ERROR, "ProtoDispatcher::StartThread() create thread error: %s\n", GetErrorString());
RemoveBreak();
Unlock(suspend_mutex); thread_id = (ThreadId)NULL;
controller = NULL;
return false;
}
external_thread = false;
Unlock(suspend_mutex);
}
else
{
ASSERT(threadId == GetCurrentThread());
thread_id = threadId; external_thread = true;
thread_started = true;
exit_status = Run();
Unlock(suspend_mutex);
}
return true;
}
bool ProtoDispatcher::PromptThread()
{
if (SuspendThread())
{
prompt_set = true; if (!SignalThread())
{
ResumeThread();
return false; }
UnsignalThread();
ResumeThread();
return true;
}
else
{
return false;
}
}
bool ProtoDispatcher::SignalThread()
{
SuspendThread();
ThreadId currentThread = GetCurrentThread();
if (IsThreaded() && (currentThread != thread_id))
{
if (signal_count > 0)
{
signal_count++;
return true;
}
else
{
if (!SetBreak())
{
PLOG(PL_ERROR, "ProtoDispatcher::SignalThread() error: SetBreak() failed!\n");
ResumeThread();
return false;
}
Lock(signal_mutex);
signal_count = 1;
thread_signaled = true;
}
}
return true;
}
void ProtoDispatcher::UnsignalThread()
{
ThreadId currentThread = GetCurrentThread();
if (IsThreaded() && (currentThread != thread_id) && (thread_master == currentThread))
{
ASSERT(0 != signal_count);
signal_count--;
if (0 == signal_count)
Unlock(signal_mutex);
}
ResumeThread();
}
bool ProtoDispatcher::SuspendThread()
{
ThreadId currentThread = GetCurrentThread();
if (IsThreaded() && (currentThread != thread_id))
{
if (currentThread == thread_master)
{
suspend_count++;
return true;
}
while (!thread_started);
Lock(suspend_mutex); thread_master = currentThread;
suspend_count = 1;
}
return true;
}
void ProtoDispatcher::ResumeThread()
{
ThreadId currentThread = GetCurrentThread();
if (IsThreaded() && (currentThread != thread_id))
{
if (currentThread == thread_master)
{
if (suspend_count > 1)
{
suspend_count--;
}
else
{
thread_master = (ThreadId)NULL;
suspend_count = 0;
Unlock(suspend_mutex);
}
}
}
}
void ProtoDispatcher::DestroyThread()
{
if (IsThreaded())
{
controller = NULL;
if (!external_thread)
{
#ifdef WIN32
if (!IsMyself()) WaitForSingleObject(actual_thread_handle, INFINITE);
#ifdef _WIN32_WCE
CloseHandle(actual_thread_handle);
actual_thread_handle = NULL;
#endif #else
if (!IsMyself()) pthread_join(thread_id, NULL);
#endif }
thread_started = false;
thread_id = (ThreadId)NULL;
external_thread = false;
RemoveBreak();
Destroy(suspend_mutex);
Destroy(signal_mutex);
}
}
#ifdef UNIX
#if defined(USE_SELECT)
bool ProtoDispatcher::UpdateStreamNotification(Stream& stream, NotificationCommand cmd)
{
switch (cmd)
{
case ENABLE_INPUT:
stream.SetNotifyFlag(Stream::NOTIFY_INPUT);
break;
case DISABLE_INPUT:
stream.UnsetNotifyFlag(Stream::NOTIFY_INPUT);
if (INVALID_DESCRIPTOR != stream.GetInputHandle())
FD_CLR(stream.GetInputHandle(), &input_set);
break;
case ENABLE_OUTPUT:
stream.SetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
case DISABLE_OUTPUT:
stream.UnsetNotifyFlag(Stream::NOTIFY_OUTPUT);
if (INVALID_DESCRIPTOR != stream.GetOutputHandle())
FD_CLR(stream.GetOutputHandle(), &output_set);
break;
case DISABLE_ALL:
stream.ClearNotifyFlags();
if (INVALID_DESCRIPTOR != stream.GetInputHandle())
FD_CLR(stream.GetInputHandle(), &input_set);
if (INVALID_DESCRIPTOR != stream.GetOutputHandle())
FD_CLR(stream.GetOutputHandle(), &output_set);
break;
}
return true;
}
#elif defined(USE_KQUEUE)
bool ProtoDispatcher::UpdateStreamNotification(Stream& stream, NotificationCommand cmd)
{
switch (cmd)
{
case ENABLE_INPUT:
if (!KeventChange(stream.GetInputHandle(), EVFILT_READ, EV_ADD | EV_ENABLE, &stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(ENABLE_INPUT) KeventChange() error!\n");
return false;
}
stream.SetNotifyFlag(Stream::NOTIFY_INPUT);
break;
case DISABLE_INPUT:
if (!KeventChange(stream.GetInputHandle(), EVFILT_READ, EV_DISABLE, &stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_INPUT) KeventChange() error!\n");
return false;
}
stream.UnsetNotifyFlag(Stream::NOTIFY_INPUT);
break;
case ENABLE_OUTPUT:
if (!KeventChange(stream.GetOutputHandle(), EVFILT_WRITE, EV_ADD | EV_ENABLE, &stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_INPUT) KeventChange() error!\n");
return false;
}
stream.SetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
case DISABLE_OUTPUT:
if (!KeventChange(stream.GetOutputHandle(), EVFILT_WRITE, EV_DISABLE, &stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_INPUT) KeventChange() error!\n");
return false;
}
stream.UnsetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
case DISABLE_ALL:
if (stream.IsInput() && !KeventChange(stream.GetInputHandle(), EVFILT_READ, EV_DISABLE, &stream))
{
PLOG(PL_WARN, "ProtoDispatcher::UpdateStreamNotification(DISABLE_ALL) KeventChange(EVFILT_READ) error!\n");
}
if (stream.IsOutput() && !KeventChange(stream.GetOutputHandle(), EVFILT_WRITE, EV_DISABLE, &stream))
{
PLOG(PL_WARN, "ProtoDispatcher::UpdateStreamNotification(DISABLE_ALL) KeventChange(EVFILT_WRITE) error!\n");
}
stream.ClearNotifyFlags();
break;
}
return true;
}
bool ProtoDispatcher::KeventChange(uintptr_t ident, int16_t filter, uint16_t flags, void* udata)
{
if (-1 == kevent_queue)
{
if (-1 == (kevent_queue = kqueue()))
{
PLOG(PL_ERROR, "ProtoDispatcher::KeventChange() kqueue() error: %s\n", GetErrorString());
return false;
}
}
struct kevent kvt;
kvt.ident = ident;
kvt.filter = filter;
kvt.flags = flags;
kvt.fflags = 0;
kvt.data = 0;
kvt.udata = udata;
if (0 != (flags & EV_DISABLE))
{
struct kevent* kep = kevent_array;
for (int i = 0; i < wait_status; i++)
{
if ((ident == kep->ident) && (filter == kep->filter))
{
kep->ident = 0; kep->filter = EVFILT_USER;
}
}
}
if (kevent(kevent_queue, &kvt, 1, NULL, 0, NULL) < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::KeventChange() kevent() error: %s\n", GetErrorString());
return false;
}
else
{
return true;
}
}
#elif USE_EPOLL
bool ProtoDispatcher::UpdateStreamNotification(Stream& stream, NotificationCommand cmd)
{
switch (cmd)
{
case ENABLE_INPUT:
ASSERT(!stream.IsInput());
if (!((stream.IsOutput() && (stream.GetInputHandle() == stream.GetOutputHandle())) ?
EpollChange(stream.GetInputHandle(), EPOLLIN | EPOLLOUT, EPOLL_CTL_MOD, &stream) :
EpollChange(stream.GetInputHandle(), EPOLLIN, EPOLL_CTL_ADD, &stream)))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(ENABLE_INPUT) error: EpollChange() failed!\n");
return false;
}
stream.SetNotifyFlag(Stream::NOTIFY_INPUT);
break;
case DISABLE_INPUT:
ASSERT(stream.IsInput());
if (!((stream.IsOutput() && (stream.GetInputHandle() == stream.GetOutputHandle())) ?
EpollChange(stream.GetInputHandle(), EPOLLOUT, EPOLL_CTL_MOD, &stream) :
EpollChange(stream.GetInputHandle(), 0, EPOLL_CTL_DEL, NULL)))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_INPUT) error: EpollChange() failed!\n");
return false;
}
stream.UnsetNotifyFlag(Stream::NOTIFY_INPUT);
break;
case ENABLE_OUTPUT:
ASSERT(!stream.IsOutput());
if (!((stream.IsInput() && (stream.GetInputHandle() == stream.GetOutputHandle())) ?
EpollChange(stream.GetOutputHandle(), EPOLLIN | EPOLLOUT, EPOLL_CTL_MOD, &stream) :
EpollChange(stream.GetOutputHandle(), EPOLLOUT, EPOLL_CTL_ADD, &stream)))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(ENABLE_OUTPUT) error: EpollChange() failed!\n");
return false;
}
stream.SetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
case DISABLE_OUTPUT:
ASSERT(stream.IsOutput());
if (!((stream.IsInput() && (stream.GetInputHandle() == stream.GetOutputHandle())) ?
EpollChange(stream.GetOutputHandle(), EPOLLIN, EPOLL_CTL_MOD, &stream) :
EpollChange(stream.GetOutputHandle(), 0, EPOLL_CTL_DEL, NULL)))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_OUTPUT) error: EpollChange() failed!\n");
return false;
}
stream.UnsetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
case DISABLE_ALL:
ASSERT(stream.IsInput() || stream.IsOutput());
if (stream.GetInputHandle() == stream.GetOutputHandle())
{
if (!EpollChange(stream.GetInputHandle(), 0, EPOLL_CTL_DEL, NULL))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_ALL) error: EpollChange() failed!\n");
return false;
}
}
else
{
if (stream.IsInput())
{
if (!EpollChange(stream.GetInputHandle(), 0, EPOLL_CTL_DEL, NULL))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_ALL) error: EpollChange(input) failed!\n");
return false;
}
}
if (stream.IsOutput())
{
if (!EpollChange(stream.GetOutputHandle(), 0, EPOLL_CTL_DEL, NULL))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification(DISABLE_ALL) error: EpollChange(output) failed!\n");
return false;
}
}
}
stream.ClearNotifyFlags();
break;
}
return true;
}
bool ProtoDispatcher::EpollChange(int fd, int events, int op, void* udata)
{
if (-1 == epoll_fd)
{
if (-1 == (epoll_fd = epoll_create1(0)))
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification() epoll_create() error: %s\n",
GetErrorString());
return false;
}
}
switch (op)
{
case EPOLL_CTL_MOD:
{
struct epoll_event* evp = epoll_event_array;
for (int i = 0; i < wait_status; i++)
{
if (evp->data.ptr == udata)
evp->events &= events; evp++;
}
break;
}
case EPOLL_CTL_DEL:
{
struct epoll_event* evp = epoll_event_array;
for (int i = 0; i < wait_status; i++)
{
if (evp->data.ptr == udata)
{
evp->events = 0; evp->data.ptr = NULL; }
evp++;
}
break;
}
default:
break;
} struct epoll_event event;
event.events = events;
event.data.ptr = udata;
if (-1 == epoll_ctl(epoll_fd, op, fd, &event))
{
#ifdef USE_TIMERFD
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification() epoll_ctl() error: %s (timer_fd:%d epoll_fd:%d)\n",
GetErrorString(), timer_stream.GetDescriptor(), epoll_fd);
#else
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification() epoll_ctl() error: %s (epoll_fd:%d)\n",
GetErrorString(), epoll_fd);
#endif
return false;
}
return true;
}
#else
#error "undefined async i/o mechanism"
#endif
bool ProtoDispatcher::InstallBreak()
{
#ifndef USE_KQUEUE
if (!break_event.Open())
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallBreak() break_event.Open() error: %s\n", GetErrorString());
return false;
}
#ifdef USE_EPOLL
if (!EpollChange(break_event.GetDescriptor(), EPOLLIN, EPOLL_CTL_ADD, &break_stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallBreak() error: EpollChange() failed!\n");
break_event.Close();
return false;
}
#endif #else
if (!KeventChange(1, EVFILT_USER, EV_ADD | EV_CLEAR, NULL))
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallBreak() KeventChange(EVFILT_USER) error: %s\n", GetErrorString());
return false;
}
#endif return true;
}
bool ProtoDispatcher::SetBreak()
{
#ifndef USE_KQUEUE
if (!break_event.Set())
{
PLOG(PL_ERROR, "ProtoDispatcher::SetBreak() break_event.Set() error: %s\n", GetErrorString());
return false;
}
#else
struct kevent kev;
EV_SET(&kev, 1, EVFILT_USER, 0, NOTE_TRIGGER, 0, NULL);
if (-1 == kevent(kevent_queue, &kev, 1, NULL, 0, NULL))
{
PLOG(PL_ERROR, "ProtoDispatcher::SetBreak() kevent() error: %s\n", GetErrorString());
return false;
}
#endif return true;
}
void ProtoDispatcher::RemoveBreak()
{
#ifndef USE_KQUEUE
if (INVALID_DESCRIPTOR != break_stream.GetDescriptor())
{
#ifdef USE_EPOLL
if (!EpollChange(break_stream.GetDescriptor(), EPOLLIN, EPOLL_CTL_DEL, &break_stream))
{
PLOG(PL_ERROR, "ProtoDispatcher::RemoveBreak() error: EpollChange() failed!\n");
}
#endif break_event.Close();
}
#else
struct kevent kev;
EV_SET(&kev, 1, EVFILT_USER, EV_DELETE, 0, 0, NULL);
if (-1 == kevent(kevent_queue, &kev, 1, NULL, 0, NULL))
PLOG(PL_ERROR, "ProtoDispatcher::RemoveBreak() kevent() error: %s\n", GetErrorString());
#endif }
void ProtoDispatcher::Wait()
{
#if defined(USE_KQUEUE) || defined(HAVE_PSELECT) || defined(USE_TIMERFD)
#define USE_TIMESPEC 1
#endif
#ifdef USE_SELECT
int maxDescriptor = -1;
FD_ZERO(&input_set);
FD_ZERO(&output_set);
#endif
#ifdef USE_TIMESPEC
struct timespec timeout;
struct timespec* timeoutPtr = NULL;
#else
struct timeval timeout;
struct timeval* timeoutPtr = NULL;
#endif double timerDelay = timer_delay;
if (timerDelay < 0.0)
{
#ifdef USE_TIMERFD
timeout.tv_sec = 0;
timeout.tv_nsec = 0;
#endif
}
else
{
#if defined(MACOSX)
#define PRECISE_THRESHOLD 1.0e-05
#elif defined(LINUX)
#ifdef USE_TIMERFD
#define PRECISE_THRESHOLD 2.0e-05
#else
#define PRECISE_THRESHOLD 2.0e-03
#endif #else
#define PRECISE_THRESHOLD 2.0e-03
#endif
if (precise_timing && (timerDelay < PRECISE_THRESHOLD)) timerDelay = 0.0;
timeout.tv_sec = (unsigned long)timerDelay;
#ifdef USE_TIMESPEC
timeout.tv_nsec =
(unsigned long)(1.0e+09 * (timerDelay - (double)timeout.tv_sec));
#else
timeout.tv_usec =
(suseconds_t)(1.0e+06 * (timerDelay - (double)timeout.tv_sec));
#endif timeoutPtr = &timeout;
#ifdef USE_TIMERFD
if ((0 != timeout.tv_nsec) || (0 != timeout.tv_sec))
{
struct itimerspec timerSpec;
timerSpec.it_interval.tv_sec = timerSpec.it_interval.tv_nsec = 0; timerSpec.it_value = timeout;
if (0 == timerfd_settime(timer_stream.GetDescriptor(), 0, &timerSpec, 0))
{
#ifdef USE_SELECT
FD_SET(timer_stream.GetDescriptor(), &input_set);
if (timer_stream.GetDescriptor() > maxDescriptor)
maxDescriptor = timer_stream.GetDescriptor();
#endif timeoutPtr = NULL; }
else
{
PLOG(PL_ERROR, "ProtoDispatcher::Wait() timerfd_settime() error: %s\n", GetErrorString());
}
}
#endif }
#if defined(USE_SELECT)
if (IsThreaded())
{
FD_SET(break_event.GetDescriptor(), &input_set);
if (break_event.GetDescriptor() > maxDescriptor)
maxDescriptor = break_event.GetDescriptor();
}
StreamTable::Iterator iterator(stream_table);
Stream* stream;
while (NULL != (stream = iterator.GetNextItem()))
{
if (stream->IsInput())
{
Descriptor descriptor = stream->GetInputHandle();
FD_SET(descriptor, &input_set);
if (descriptor > maxDescriptor) maxDescriptor = descriptor;
}
if (stream->IsOutput())
{
Descriptor descriptor = stream->GetOutputHandle();
FD_SET(descriptor, &output_set);
if (descriptor > maxDescriptor) maxDescriptor = descriptor;
}
}
#ifdef HAVE_PSELECT
wait_status = pselect(maxDescriptor+1,
(fd_set*)&input_set,
(fd_set*)&output_set,
(fd_set*) NULL,
timeoutPtr,
(sigset_t*)NULL);;
#else
wait_status = select(maxDescriptor+1,
(fd_set*)&input_set,
(fd_set*)&output_set,
(fd_set*) NULL,
(timeval*)timeoutPtr);
#endif
#elif defined(USE_EPOLL)
if (-1 == epoll_fd)
{
if (-1 == (epoll_fd = epoll_create1(0)))
{
PLOG(PL_ERROR, "ProtoDispatcher::Wait() epoll_create() error: %s\n",
GetErrorString());
return;
}
}
wait_status = epoll_wait(epoll_fd, epoll_event_array, EPOLL_ARRAY_SIZE, (NULL != timeoutPtr) ? (int)(timerDelay*1000.0) : -1);
#elif defined(USE_KQUEUE)
if (-1 == kevent_queue)
{
if (-1 == (kevent_queue = kqueue()))
{
PLOG(PL_ERROR, "ProtoDispatcher::Wait() kqueue() error: %s\n", GetErrorString());
wait_status = WAIT_ERROR;
return;
}
}
wait_status = kevent(kevent_queue, NULL, 0, kevent_array, KEVENT_ARRAY_SIZE, timeoutPtr);
#else
#error "undefined async i/o mechanism"
#endif
}
void ProtoDispatcher::Dispatch()
{
#if defined(USE_SELECT)
switch (wait_status)
{
case -1:
if (EINTR != errno)
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() select() error: %s\n", GetErrorString());
break;
case 0:
OnSystemTimeout();
break;
default:
StreamTable::Iterator iterator(stream_table);
Stream* stream;
while (NULL != (stream = iterator.GetNextItem()))
{
switch (stream->GetType())
{
case Stream::CHANNEL:
{
ProtoChannel& theChannel = static_cast<ChannelStream*>(stream)->GetChannel();
if (stream->IsInput())
{
if (FD_ISSET(theChannel.GetInputEventHandle(), &input_set))
theChannel.OnNotify(ProtoChannel::NOTIFY_INPUT);
}
if (stream->IsOutput())
{
if (FD_ISSET(theChannel.GetOutputEventHandle(), &output_set))
theChannel.OnNotify(ProtoChannel::NOTIFY_OUTPUT);
}
break;
}
case Stream::SOCKET:
{
ProtoSocket& theSocket = static_cast<SocketStream*>(stream)->GetSocket();
Descriptor descriptor = theSocket.GetHandle();
if (stream->IsInput() && FD_ISSET(descriptor, &input_set))
theSocket.OnNotify(ProtoSocket::NOTIFY_INPUT);
if (stream->IsOutput() && FD_ISSET(descriptor, &output_set))
theSocket.OnNotify(ProtoSocket::NOTIFY_OUTPUT);
break;
}
case Stream::GENERIC:
{
Descriptor descriptor = static_cast<GenericStream*>(stream)->GetDescriptor();
if (stream->IsInput() && FD_ISSET(descriptor, &input_set))
static_cast<GenericStream*>(stream)->OnEvent(EVENT_INPUT);
if (stream->IsOutput() && FD_ISSET(descriptor, &output_set))
static_cast<GenericStream*>(stream)->OnEvent(EVENT_OUTPUT);
break;
}
case Stream::TIMER:
break;
case Stream::EVENT:
{
EventStream* eventStream = static_cast<EventStream*>(stream);
if (FD_ISSET(eventStream->GetDescriptor(), &input_set))
{
ProtoEvent& theEvent = eventStream->GetEvent();
if (theEvent.GetAutoReset()) theEvent.Reset();
theEvent.OnNotify();
}
break;
}
} }
if ((INVALID_DESCRIPTOR != break_stream.GetDescriptor()) &&
FD_ISSET(break_stream.GetDescriptor(), &input_set))
{
break_event.Reset(); }
#ifdef USE_TIMERFD
if (timer_stream.GetDescriptor() != INVALID_DESCRIPTOR &&
FD_ISSET(timer_stream.GetDescriptor(), &input_set))
{
uint64_t expirations = 0;
if (read(timer_stream.GetDescriptor(), &expirations, sizeof(expirations)) < 0)
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() read(timer_fd) error: %s\n", GetErrorString());
}
#endif OnSystemTimeout();
break;
}
#elif defined(USE_EPOLL)
switch(wait_status)
{
case WAIT_ERROR:
if (EINTR != errno)
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() epoll_wait() error: %s\n", GetErrorString());
break;
case 0:
OnSystemTimeout();
break;
default:
struct epoll_event* evp = epoll_event_array;
for (int i = 0; i < wait_status; i++)
{
if (NULL != evp->data.ptr)
{
Stream* stream = (Stream*)evp->data.ptr;
switch (stream->GetType())
{
case Stream::CHANNEL:
{
ProtoChannel& channel = static_cast<ChannelStream*>(stream)->GetChannel();
if (0 != (EPOLLIN & evp->events))
{
if (stream->IsInput())
channel.OnNotify(ProtoChannel::NOTIFY_INPUT);
}
if (0 != (EPOLLOUT & evp->events))
{
if (stream->IsOutput())
channel.OnNotify(ProtoChannel::NOTIFY_OUTPUT);
}
if (0 != (EPOLLERR & evp->events))
{
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() ProtoChannel epoll event error: %s\n", GetErrorString());
if (stream->IsInput())
channel.OnNotify(ProtoChannel::NOTIFY_INPUT);
else if (stream->IsOutput())
channel.OnNotify(ProtoChannel::NOTIFY_OUTPUT);
}
break;
}
case Stream::SOCKET:
{
ProtoSocket& socket = static_cast<SocketStream*>(stream)->GetSocket();
if (0 != (EPOLLIN & evp->events))
{
if (stream->IsInput())
socket.OnNotify(ProtoSocket::NOTIFY_INPUT);
}
if (0 != (EPOLLOUT & evp->events))
{
if (stream->IsOutput())
socket.OnNotify(ProtoSocket::NOTIFY_OUTPUT);
}
if (0 != (EPOLLERR & evp->events))
{
socket.OnNotify(ProtoSocket::NOTIFY_ERROR);
}
break;
}
case Stream::GENERIC:
{
if (0 != (EPOLLIN & evp->events))
{
if (stream->IsInput())
static_cast<GenericStream*>(stream)->OnEvent(EVENT_INPUT);
}
if (0 != (EPOLLOUT & evp->events))
{
if (stream->IsOutput())
static_cast<GenericStream*>(stream)->OnEvent(EVENT_OUTPUT);
}
if (0 != (EPOLLERR & evp->events))
{
if (stream->IsInput())
static_cast<GenericStream*>(stream)->OnEvent(EVENT_INPUT);
else if (stream->IsOutput())
static_cast<GenericStream*>(stream)->OnEvent(EVENT_OUTPUT);
}
break;
}
case Stream::TIMER:
{
#ifdef USE_TIMERFD
uint64_t expirations = 0;
if (read(static_cast<TimerStream*>(stream)->GetDescriptor(), &expirations, sizeof(expirations)) < 0)
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() read(timer_fd) error: %s\n", GetErrorString());
#endif break;
}
case Stream::EVENT:
{
if (0 != (EPOLLIN & evp->events))
{
EventStream* eventStream = static_cast<EventStream*>(stream);
ProtoEvent& event = eventStream->GetEvent();
if (event.GetAutoReset()) event.Reset();
event.OnNotify();
}
break;
}
} }
else
{
}
evp++;
} OnSystemTimeout();
break;
}
#elif defined(USE_KQUEUE)
switch (wait_status)
{
case WAIT_ERROR:
if (EINTR != errno)
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() kevent() error: %s\n", GetErrorString());
break;
case 0:
OnSystemTimeout();
break;
default:
{
struct kevent* kep = kevent_array;
for (int i = 0; i < wait_status; i++)
{
if (0 != (EV_ERROR & kep->flags))
{
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() kqueue event error: %s\n", GetErrorString((int)(kep->data)));
kep++;
continue;
}
Stream* stream = (Stream*)kep->udata;
switch (kep->filter)
{
case EVFILT_READ:
ASSERT(NULL != stream);
if (!stream->IsInput()) break;
switch (stream->GetType())
{
case Stream::CHANNEL:
static_cast<ChannelStream*>(stream)->GetChannel().OnNotify(ProtoChannel::NOTIFY_INPUT);
break;
case Stream::SOCKET:
static_cast<SocketStream*>(stream)->GetSocket().OnNotify(ProtoSocket::NOTIFY_INPUT);
break;
case Stream::GENERIC:
static_cast<GenericStream*>(stream)->OnEvent(EVENT_INPUT);
break;
case Stream::TIMER:
break;
case Stream::EVENT:
EventStream* eventStream = static_cast<EventStream*>(stream);
ProtoEvent& theEvent = eventStream->GetEvent();
if (theEvent.GetAutoReset()) theEvent.Reset();
theEvent.OnNotify();
break;
}
break;
case EVFILT_WRITE:
ASSERT(NULL != stream);
if (!stream->IsOutput()) break;
switch (stream->GetType())
{
case Stream::CHANNEL:
static_cast<ChannelStream*>(stream)->GetChannel().OnNotify(ProtoChannel::NOTIFY_OUTPUT);
break;
case Stream::SOCKET:
static_cast<SocketStream*>(stream)->GetSocket().OnNotify(ProtoSocket::NOTIFY_OUTPUT);
break;
case Stream::GENERIC:
static_cast<GenericStream*>(stream)->OnEvent(EVENT_OUTPUT);
break;
case Stream::TIMER:
case Stream::EVENT:
break;
}
break;
case EVFILT_USER:
break;
case EVFILT_TIMER:
default:
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() error: unexpected event filter type %d\n", kep->filter);
break;
} kep++;
}
OnSystemTimeout(); break;
}
} #else
#error "undefined async i/o mechanism"
#endif wait_status = 0; }
#endif
#ifdef WIN32
bool ProtoDispatcher::InstallBreak()
{
ASSERT(INVALID_DESCRIPTOR == break_event.GetDescriptor());
if (!break_event.Open())
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallBreak() CreateEvent() error\n");
return false;
}
int index = Win32AddStream(break_stream, break_event.GetDescriptor());
if (index < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::InstallBreak() error: add break_stream failed!\n");
break_event.Close();
return false;
}
break_stream.SetIndex(index);
return true;
}
bool ProtoDispatcher::SetBreak()
{
if (!break_event.Set())
{
PLOG(PL_ERROR, "ProtoDispatcher::SetBreak() break_event.Set() error: %s\n", GetErrorString());
return false;
}
return true;
}
void ProtoDispatcher::RemoveBreak()
{
if (INVALID_DESCRIPTOR != break_stream.GetDescriptor())
{
Win32RemoveStream(break_stream.GetIndex());
break_stream.SetIndex(-1);
break_event.Close();
}
}
bool ProtoDispatcher::Win32Init()
{
HINSTANCE theInstance = GetModuleHandle(NULL);
WNDCLASS cl;
cl.style = CS_HREDRAW | CS_VREDRAW;
cl.lpfnWndProc = MessageHandler;
cl.cbClsExtra = 0;
cl.cbWndExtra = 0;
cl.hInstance = theInstance;
cl.hIcon = NULL;
cl.hCursor = NULL;
cl.hbrBackground = NULL;
cl.lpszMenuName = NULL;
cl.lpszClassName = _T("ProtoDispatcher");
if (!RegisterClass(&cl))
{
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, GetLastError(),
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &lpMsgBuf, 0, NULL );
MessageBox( NULL, (LPCTSTR)lpMsgBuf, (LPCTSTR)"Error", MB_OK | MB_ICONINFORMATION );
LocalFree(lpMsgBuf);
PLOG(PL_ERROR, "ProtoDispatcher::Win32Init() Error registering message window class!\n");
return false;
}
HWND parent = NULL;
#ifdef HWND_MESSAGE
parent = HWND_MESSAGE;
#endif msg_window = CreateWindowEx(0, _T("ProtoDispatcher"), _T("ProtoDispatcher"), WS_OVERLAPPED, CW_USEDEFAULT, CW_USEDEFAULT, 0, 0, parent, NULL, theInstance, this); if (NULL != msg_window)
{
ShowWindow(msg_window, SW_HIDE);
return true;
}
else
{
PLOG(PL_ERROR, "ProtoDispatcher::Win32Init() Error creating message window: %s (%d)\n",
GetErrorString(), GetLastError());
return false;
}
}
void ProtoDispatcher::Win32Cleanup()
{
if (NULL != msg_window)
{
DestroyWindow(msg_window);
msg_window = NULL;
}
}
bool ProtoDispatcher::UpdateStreamNotification(Stream& stream, NotificationCommand cmd)
{
switch (cmd)
{
case ENABLE_INPUT:
{
ASSERT(stream.GetIndex() < 0); int index = Win32AddStream(stream, stream.GetInputHandle());
if (index < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification() error adding input stream\n");
return false;
}
stream.SetIndex(index);
stream.SetNotifyFlag(Stream::NOTIFY_INPUT);
break;
}
case DISABLE_INPUT:
{
ASSERT(stream.GetIndex() >= 0);
Win32RemoveStream(stream.GetIndex());
stream.SetIndex(-1);
stream.UnsetNotifyFlag(Stream::NOTIFY_INPUT);
break;
}
case ENABLE_OUTPUT:
{
ASSERT(stream.GetOutdex() < 0); int outdex = Win32AddStream(stream, stream.GetOutputHandle());
if (outdex < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::UpdateStreamNotification() error adding output stream\n");
return false;
}
stream.SetOutdex(outdex);
stream.SetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
}
case DISABLE_OUTPUT:
{
ASSERT(stream.GetOutdex() >= 0);
Win32RemoveStream(stream.GetOutdex());
stream.SetOutdex(-1);
stream.UnsetNotifyFlag(Stream::NOTIFY_OUTPUT);
break;
}
case DISABLE_ALL:
{
if (stream.GetIndex() >= 0)
{
Win32RemoveStream(stream.GetIndex());
stream.SetIndex(-1);
}
if (stream.GetOutdex() >= 0)
{
Win32RemoveStream(stream.GetOutdex());
stream.SetIndex(-1);
}
stream.ClearNotifyFlags();
break;
}
}
return true;
}
int ProtoDispatcher::Win32AddStream(Stream& stream, HANDLE handle)
{
DWORD index = stream_count;
if (index >= stream_array_size)
{
if (!Win32IncreaseStreamArraySize())
{
PLOG(PL_ERROR, "ProtoDispatcher::AddStream() error increasing array size\n");
return -1;
}
}
stream_handles_array[index] = handle;
stream_ptrs_array[index] = &stream;
stream_count++;
return index;
}
void ProtoDispatcher::Win32RemoveStream(int index)
{
ASSERT((index >= 0) && (index < (int)stream_count));
unsigned int moveCount = stream_count - index - 1;
memmove(stream_handles_array+index, stream_handles_array+index+1,
moveCount*sizeof(HANDLE));
memmove(stream_ptrs_array+index, stream_ptrs_array+index+1,
moveCount*sizeof(Stream*));
stream_count--;
for (; index < (int)stream_count; index++)
stream_ptrs_array[index]->SetIndex(index);
}
bool ProtoDispatcher::Win32IncreaseStreamArraySize()
{
unsigned int newSize = (0 != stream_array_size) ? (2 * stream_array_size) : DEFAULT_STREAM_ARRAY_SIZE;
HANDLE* hPtr = new HANDLE[newSize];
Stream** iPtr = new Stream*[newSize];
if ((NULL != hPtr) && (NULL != iPtr))
{
if (0 != stream_count)
{
memcpy(hPtr, stream_handles_array, stream_count*sizeof(HANDLE));
memcpy(iPtr, stream_ptrs_array, stream_count*sizeof(Stream*));
}
if (NULL != stream_handles_array)
{
delete[] stream_handles_array;
delete[] stream_ptrs_array;
}
stream_handles_array = hPtr;
stream_ptrs_array = iPtr;
return true;
}
else
{
PLOG(PL_ERROR, "ProtoDispatcher::Win32IncreaseStreamArraySize() new stream_array error: %s\n",
GetErrorString());
if (NULL != hPtr) delete[] hPtr;
if (NULL != iPtr) delete[] iPtr;
return false;
}
}
void ProtoDispatcher::Wait()
{
double timerDelay = timer_delay;
if (!ready_stream_list.IsEmpty()) timerDelay = 0.0;
#ifdef USE_WAITABLE_TIMER
DWORD msec = INFINITE;
if (timerDelay < 0.0)
{
if (timer_active)
{
CancelWaitableTimer(timer_stream.GetDescriptor());
timer_active = false;
}
if (timer_stream.GetIndex() >= 0)
{
Win32RemoveStream(timer_stream.GetIndex());
timer_stream.SetIndex(-1);
}
}
else if (0.0 == timerDelay)
{
if (timer_active)
{
CancelWaitableTimer(timer_stream.GetDescriptor());
timer_active = false;
}
if (timer_stream.GetIndex() >= 0)
{
Win32RemoveStream(timer_stream.GetIndex());
timer_stream.SetIndex(-1);
}
msec = 0;
}
else
{
LARGE_INTEGER dueTime;
dueTime.QuadPart = (LONGLONG)(-(timerDelay * 1.0e+07)); LONG period = 0;
if (0 == SetWaitableTimer(timer_stream.GetDescriptor(), &dueTime, 0, NULL, NULL, FALSE))
{
PLOG(PL_ERROR, "ProtoDispatcher::Wait() SetWaitableTimer() error: %s\n", GetErrorString());
msec = (DWORD)(1000.0 * timerDelay);
if ((timerDelay > 0.0) && (0 == msec) && !precise_timing) msec = 1;
timer_active = false;
}
else
{
if (timer_stream.GetIndex() < 0)
{
int index = Win32AddStream(timer_stream, timer_stream.GetDescriptor());
if (index < 0)
{
PLOG(PL_ERROR, "ProtoDispatcher::Wait() error: unable to insert timer_stream\n");
msec = (DWORD)(1000.0 * timerDelay);
if ((timerDelay > 0.0) && (0 == msec) && !precise_timing) msec = 1;
timer_active = false;
}
else
{
timer_stream.SetIndex(index);
timer_active = true;
}
}
else
{
timer_active = true;
}
}
}
#else
DWORD msec = (timerDelay < 0.0) ? INFINITE : ((DWORD)(1000.0 * timerDelay));
if ((timerDelay > 0.0) && (0 == msec) && !precise_timing) msec = 1;
#endif
DWORD waitFlags = 0;
OSVERSIONINFO vinfo;
vinfo.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
GetVersionEx(&vinfo);
if ((VER_PLATFORM_WIN32_NT == vinfo.dwPlatformId) &&
((vinfo.dwMajorVersion > 4) ||
((vinfo.dwMajorVersion == 4) && (vinfo.dwMinorVersion > 0))))
waitFlags |= MWMO_INPUTAVAILABLE; #ifndef _WIN32_WCE
waitFlags |= MWMO_ALERTABLE;
#endif wait_status = MsgWaitForMultipleObjectsEx(stream_count, stream_handles_array, msec, QS_ALLINPUT, waitFlags);
}
void ProtoDispatcher::Dispatch()
{
switch (wait_status)
{
case WAIT_ERROR: {
char errorString[256];
errorString[255] = '\0';
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
GetLastError(),
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) errorString, 255, NULL);
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() MsgWaitForMultipleObjectsEx() error: %s\n", errorString);
break;
}
default:
{
StreamList::Iterator sit(ready_stream_list);
Stream* nextStream;
while (NULL != (nextStream = sit.GetNextItem()))
{
switch (nextStream->GetType())
{
case Stream::CHANNEL:
{
ProtoChannel& theChannel = static_cast<ChannelStream*>(nextStream)->GetChannel();
if (theChannel.InputNotification())
theChannel.OnNotify(ProtoChannel::NOTIFY_INPUT);
if (theChannel.OutputNotification())
theChannel.OnNotify(ProtoChannel::NOTIFY_OUTPUT);
break;
}
case Stream::SOCKET:
{
ProtoSocket& theSocket = static_cast<SocketStream*>(nextStream)->GetSocket();
if (theSocket.InputNotification())
theSocket.OnNotify(ProtoSocket::NOTIFY_INPUT);
if (theSocket.OutputNotification())
theSocket.OnNotify(ProtoSocket::NOTIFY_OUTPUT);
break;
}
default:
{
ASSERT(0);
break;
}
}
}
if (WAIT_TIMEOUT == wait_status)
{
break;
}
else if ((WAIT_OBJECT_0 <= wait_status) && (wait_status < (WAIT_OBJECT_0 + stream_count)))
{
unsigned int index = wait_status - WAIT_OBJECT_0;
Stream* stream = stream_ptrs_array[index];
switch (stream->GetType())
{
case Stream::SOCKET:
{
ProtoSocket& theSocket = static_cast<SocketStream*>(stream)->GetSocket();
WSANETWORKEVENTS event;
if (0 == WSAEnumNetworkEvents(theSocket.GetHandle(), stream_handles_array[index], &event))
{
if (0 != (event.lNetworkEvents & (FD_READ | FD_ACCEPT)))
theSocket.OnNotify(ProtoSocket::NOTIFY_INPUT);
if (0 != (event.lNetworkEvents & FD_WRITE))
theSocket.OnNotify(ProtoSocket::NOTIFY_OUTPUT);
if (0 != (event.lNetworkEvents & FD_CLOSE))
{
theSocket.SetClosing(true);
if (0 == event.iErrorCode[FD_CLOSE_BIT])
theSocket.OnNotify(ProtoSocket::NOTIFY_INPUT);
else
theSocket.OnNotify(ProtoSocket::NOTIFY_ERROR);
}
if (0 != (event.lNetworkEvents & FD_CONNECT))
{
if (0 == event.iErrorCode[FD_CONNECT_BIT])
theSocket.OnNotify(ProtoSocket::NOTIFY_OUTPUT);
else
theSocket.OnNotify(ProtoSocket::NOTIFY_ERROR);
}
if (0 != (event.lNetworkEvents & FD_ADDRESS_LIST_CHANGE))
{
if (0 == event.iErrorCode[FD_ADDRESS_LIST_CHANGE_BIT])
theSocket.OnNotify(ProtoSocket::NOTIFY_EXCEPTION);
else
theSocket.OnNotify(ProtoSocket::NOTIFY_ERROR);
}
}
else
{
PLOG(PL_ERROR, "ProtoDispatcher::Dispatch() WSAEnumNetworkEvents() error\n");
}
break;
} case Stream::CHANNEL:
{
ProtoChannel& theChannel = static_cast<ChannelStream*>(stream)->GetChannel();
if (index == (unsigned int)stream->GetIndex())
theChannel.OnNotify(ProtoChannel::NOTIFY_INPUT);
else theChannel.OnNotify(ProtoChannel::NOTIFY_OUTPUT);
break;
}
case Stream::TIMER:
{
#ifdef USE_WAITABLE_TIMER
ResetEvent(timer_stream.GetDescriptor());
timer_active = false;
#endif break;
}
case Stream::EVENT:
{
EventStream* eventStream = static_cast<EventStream*>(stream);
ProtoEvent& event = eventStream->GetEvent();
if (event.GetAutoReset()) event.Reset();
if (&break_stream != eventStream) event.OnNotify();
break;
}
case Stream::GENERIC:
{
if (stream->IsInput())
static_cast<GenericStream*>(stream)->OnEvent(EVENT_INPUT);
if (stream->IsOutput())
static_cast<GenericStream*>(stream)->OnEvent(EVENT_OUTPUT);
break;
}
} }
else if ((WAIT_OBJECT_0 + stream_count) == wait_status)
{
MSG msg;
if (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
{
if (WM_QUIT != msg.message)
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
else
{
exit_code = msg.wParam;
run = false;
break; }
}
}
else
{
}
break;
} } OnSystemTimeout();
}
LRESULT CALLBACK ProtoDispatcher::MessageHandler(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam)
{
switch (message)
{
case WM_CREATE:
{
CREATESTRUCT *info = (CREATESTRUCT *) lParam;
ProtoDispatcher* dp = (ProtoDispatcher*)info->lpCreateParams;
SetWindowLongPtr(hwnd, GWLP_USERDATA, (LONG_PTR)dp);
return 0;
}
case WM_DESTROY:
{
ProtoDispatcher* dp = (ProtoDispatcher*)GetWindowLongPtr(hwnd, GWLP_USERDATA);
dp->msg_window = NULL;
PostQuitMessage(0);
return 0;
}
case WM_QUIT:
{
ProtoDispatcher* dp = (ProtoDispatcher*)GetWindowLongPtr(hwnd, GWLP_USERDATA);
dp->run = false;
return 0;
}
default:
break;
}
return DefWindowProc(hwnd, message, wParam, lParam);
}
#endif
ProtoDispatcher::Controller::Controller(ProtoDispatcher& theDispatcher)
: dispatcher(theDispatcher), use_lock_a(true)
{
ProtoDispatcher::Init(lock_a);
ProtoDispatcher::Init(lock_b);
ProtoDispatcher::Lock(lock_a);
}
ProtoDispatcher::Controller::~Controller()
{
Unlock(lock_a);
Unlock(lock_b);
Destroy(lock_a);
Destroy(lock_b);
}
bool ProtoDispatcher::Controller::DoDispatch()
{
if (use_lock_a)
ProtoDispatcher::Unlock(lock_b);
else
ProtoDispatcher::Unlock(lock_a);
if (!SignalDispatchReady())
{
PLOG(PL_ERROR, "ProtoDispatcher::Controller::DoDispatch()) SignalDispatchReady() error\n");
return false;
}
if (use_lock_a)
{
ProtoDispatcher::Lock(lock_a);
use_lock_a = false;
}
else
{
ProtoDispatcher::Lock(lock_b);
use_lock_a = true;
}
return true;
}
void ProtoDispatcher::Controller::OnDispatch()
{
dispatcher.SuspendThread();
dispatcher.Dispatch();
if (use_lock_a)
{
ProtoDispatcher::Lock(lock_b);
ProtoDispatcher::Unlock(lock_a);
}
else
{
ProtoDispatcher::Lock(lock_a);
ProtoDispatcher::Unlock(lock_b);
}
dispatcher.ResumeThread();
}