1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
//====== Copyright Valve Corporation, All rights reserved. ====================
#ifdef __GNUC__
// src/public/tier0/basetypes.h:104:30: error: assuming signed overflow does not occur when assuming that (X + c) < X is always false [-Werror=strict-overflow]
// current steamrt:scout gcc "g++ (SteamRT 4.8.4-1ubuntu15~12.04+steamrt1.2+srt1) 4.8.4" requires this at the top due to optimizations
#pragma GCC diagnostic ignored "-Wstrict-overflow"
#endif
#include <tier1/utlpriorityqueue.h>
#include "steamnetworkingsockets_thinker.h"
#ifdef IS_STEAMDATAGRAMROUTER
#include "router/sdr.h"
#else
#include "clientlib/steamnetworkingsockets_lowlevel.h"
#endif
// memdbgon must be the last include file in a .cpp file!!!
#include "tier0/memdbgon.h"
namespace SteamNetworkingSocketsLib {
/////////////////////////////////////////////////////////////////////////////
//
// Periodic processing
//
/////////////////////////////////////////////////////////////////////////////
struct ThinkerLess
{
bool operator()( const IThinker *a, const IThinker *b ) const
{
return a->GetNextThinkTime() > b->GetNextThinkTime();
}
};
class ThinkerSetIndex
{
public:
static void SetIndex( IThinker *p, int idx, void *pContext ) { p->m_queueIndex = idx; }
};
static CUtlPriorityQueue<IThinker*,ThinkerLess,ThinkerSetIndex> s_queueThinkers;
IThinker::IThinker()
: m_usecNextThinkTime( k_nThinkTime_Never )
, m_queueIndex( -1 )
{
}
IThinker::~IThinker()
{
ClearNextThinkTime();
}
#ifdef __GNUC__
// older steamrt:scout gcc requires this also, probably getting confused by unbalanced push/pop
#pragma GCC diagnostic ignored "-Wstrict-overflow"
#endif
#ifdef IS_STEAMDATAGRAMROUTER
struct ShortDurationLock { inline void lock() {}; inline void unlock() {}; };
static ShortDurationLock s_mutexThinkerTable;
#else
static ShortDurationLock s_mutexThinkerTable( "thinker", ShortDurationLock::k_nOrder_Max ); // We do sometimes take another lock, but it is always a "try"
#endif
// Base class isn't lockable
bool IThinker::TryLock() const { return true; }
void IThinker::InternalEnsureMinThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
{
s_mutexThinkerTable.lock();
if ( usecTargetThinkTime < m_usecNextThinkTime )
InternalSetNextThinkTime( usecTargetThinkTime );
s_mutexThinkerTable.unlock();
}
ATTR_NO_SANITIZE_THREAD void IThinker::SetNextThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
{
// Fast-path: check whether the schedule is already set to the requested time before acquiring
// the thinker mutex. This read of m_usecNextThinkTime is intentionally lockless -- the service
// thread may write it concurrently. The race is safe because:
// 1. The read is a hint only. A stale or torn value just causes a spurious fall-through to
// InternalSetNextThinkTime, which re-checks under the lock. There is no state corruption.
// 2. On all supported 64-bit platforms the aligned 64-bit load is atomic at the hardware level,
// so a torn read is not possible in practice.
// We use ATTR_NO_SANITIZE_THREAD rather than making m_usecNextThinkTime std::atomic<> because
// std::atomic would suppress TSan on every access to the variable, masking any future
// unintentional races. The attribute suppresses only this one known-safe lockless access.
if ( usecTargetThinkTime == m_usecNextThinkTime )
return;
s_mutexThinkerTable.lock();
InternalSetNextThinkTime( usecTargetThinkTime );
s_mutexThinkerTable.unlock();
}
ATTR_NO_SANITIZE_THREAD void IThinker::EnsureMinThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
{
// Lockless fast-path read -- see SetNextThinkTime for explanation of the intentional race.
if ( usecTargetThinkTime < m_usecNextThinkTime )
InternalEnsureMinThinkTime( usecTargetThinkTime );
}
void IThinker::InternalSetNextThinkTime( SteamNetworkingMicroseconds usecTargetThinkTime )
{
// Protect against us blowing up because of an invalid think time.
// Zero is reserved (since it often means there is an uninitialized value),
// and our initial time value is effectively infinite compared to the
// intervals we deal with in this code, so we should never need to deal
// with a timestamp that far in the past. See k_nThinkTime_ASAP
if ( unlikely( usecTargetThinkTime <= 0 ) )
{
AssertMsg1( false, "Attempt to set target think time to %lld", (long long)usecTargetThinkTime );
usecTargetThinkTime = SteamNetworkingSockets_GetLocalTimestamp() + 2000;
}
// Clearing it?
if ( usecTargetThinkTime == k_nThinkTime_Never )
{
if ( m_queueIndex >= 0 )
{
Assert( s_queueThinkers.Element( m_queueIndex ) == this );
s_queueThinkers.RemoveAt( m_queueIndex );
Assert( m_queueIndex == -1 );
}
m_usecNextThinkTime = k_nThinkTime_Never;
return;
}
// Save current time when the next thinker wants service
#ifndef IS_STEAMDATAGRAMROUTER
SteamNetworkingMicroseconds usecNextWake = ( s_queueThinkers.Count() > 0 ) ? s_queueThinkers.ElementAtHead()->GetNextThinkTime() : k_nThinkTime_Never;
#endif
// Not currently scheduled?
if ( m_queueIndex < 0 )
{
Assert( m_usecNextThinkTime == k_nThinkTime_Never );
m_usecNextThinkTime = usecTargetThinkTime;
s_queueThinkers.Insert( this );
}
else
{
// We're already scheduled.
Assert( s_queueThinkers.Element( m_queueIndex ) == this );
Assert( m_usecNextThinkTime != k_nThinkTime_Never );
// Set the new schedule time
m_usecNextThinkTime = usecTargetThinkTime;
// And update our position in the queue
s_queueThinkers.RevaluateElement( m_queueIndex );
}
// Check that we know our place
Assert( m_queueIndex >= 0 );
Assert( s_queueThinkers.Element( m_queueIndex ) == this );
#ifndef IS_STEAMDATAGRAMROUTER
// Do we need service before we were previously schedule to wake up?
// If so, wake the thread now so that it can redo its schedule work
// NOTE: On Windows we could use a waitable timer. This would avoid
// waking up the service thread just to re-schedule when it should
// wake up for real.
if ( m_usecNextThinkTime < usecNextWake )
WakeServiceThread();
#endif
}
SteamNetworkingMicroseconds IThinker::Thinker_GetNextScheduledThinkTime()
{
SteamNetworkingMicroseconds usecResult = k_nThinkTime_Never;
s_mutexThinkerTable.lock();
if ( s_queueThinkers.Count() )
usecResult = s_queueThinkers.ElementAtHead()->GetNextThinkTime();
s_mutexThinkerTable.unlock();
return usecResult;
}
void IThinker::Thinker_ProcessThinkers()
{
// We need the lock to access the thinker queue
s_mutexThinkerTable.lock();
// Until the queue is empty
int nIterations = 0;
while ( s_queueThinkers.Count() > 0 )
{
// Grab the head element
IThinker *pNextThinker = s_queueThinkers.ElementAtHead();
// Refetch timestamp each time. The reason is that certain thinkers
// may pass through to other systems (e.g. fake lag) that fetch the time.
// If we don't update the time here, that code may have used the newer
// timestamp (e.g. to mark when a packet was received) and then
// in our next iteration, we will use an older timestamp to process
// a thinker.
SteamNetworkingMicroseconds usecNow = SteamNetworkingSockets_GetLocalTimestamp();
// Scheduled too far in the future?
if ( pNextThinker->GetNextThinkTime() >= usecNow )
{
// Keep waiting
break;
}
++nIterations;
if ( nIterations > 10000 )
{
AssertMsg1( false, "Processed thinkers %d times -- probably one thinker keeps requesting an immediate wakeup call.", nIterations );
break;
}
// Try to acquire the thinker's lock, if any
if ( pNextThinker->TryLock() )
{
// Go ahead and clear his think time now and remove him
// from the heap. He needs to schedule a new think time
// if heeds service again. For thinkers that need frequent
// service, removing them and then re-inserting them when
// they reschedule is a bit of extra work that could be
// optimized by trying to not remove them now, but adjusting
// them once we know when they want to think. But this
// is probably just a bit too complicated for the expected
// benefit. If the number of total Thinkers is relatively
// small (which it probably will be), the heap operations
// are probably negligible.
pNextThinker->InternalSetNextThinkTime( k_nThinkTime_Never );
// Release the global thinker table lock, so that other threads
// can schedule work while we are doing work here.
// (E.g. other connections can be accessed in the main thread,
// and can mark the connection to wake up.)
s_mutexThinkerTable.unlock();
// Execute callback. (Note: this could result
// in self-destruction or essentially any change
// to the rest of the queue.)
pNextThinker->Think( usecNow );
// Re-acquire table lock for the next check
s_mutexThinkerTable.lock();
}
else
{
// Deadlock! Should be extremely rare. Reschedule him for 1ms in the
// future, and we'll try again.
pNextThinker->InternalSetNextThinkTime( usecNow + 1000 );
}
}
// Release table lock
s_mutexThinkerTable.unlock();
}
#ifdef DBGFLAG_VALIDATE
void Thinker_ValidateStatics( CValidator &validator )
{
ValidateRecursive( s_queueThinkers );
}
void IThinker::Validate( CValidator &validator, const char *pchName )
{
}
#endif
} // namespace SteamNetworkingSocketsLib