add-determinism 0.7.3

RPM buildroot helper to strip nondeterministic bits in files
Documentation
f

��_�Fc@s�dZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
mZddlmZddlmZejdd��Ze�eed�d	�Gd
d�dej��Ze�eed�d	�Gdd
�d
e��Ze�eed�d	�Gdd�de��Ze�eed�d	�Gdd�de��Ze�eed�d	�e�eed�d�Gdd�de���Ze�eed�d	�Gdd�de��ZGdd�de�Zedk�r�e��dS)a�
This test suite exercises some system calls subject to interruption with EINTR,
to check that it is actually handled transparently.
It is intended to be run by the main test suite within a child process, to
ensure there is no background thread running (so that signals are delivered to
the correct thread).
Signals are generated in-process using setitimer(ITIMER_REAL), which allows
sub-second periodicity (contrarily to signal()).
�N)�support)�	os_helper)�
socket_helperccsJ|�2z
|VWn|���Yn0Wd�n1s<0YdS)zGContext manager killing the subprocess if a Python exception is raised.N)Zkill)�proc�r�4/usr/lib64/python3.10/test/eintrdata/eintr_tester.py�
kill_on_errors
r�	setitimerzrequires setitimer()c@sHeZdZdZdZdZdZdd�Zdd�Ze	dd	��Z
d
d�Zdd
�ZdS)�
EINTRBaseTestz Base class for EINTR tests. g�������?g�������?cCs|jd7_dS�N�)�signals)�self�signumZframerrr�
sighandler3�zEINTRBaseTest.sighandlercCsBd|_t�tj|j�|_t�tj|j|j�t	j
ddtjd�dS)NriXT)ZexitZfile)
r�signal�SIGALRMr�orig_handlerr�ITIMER_REAL�signal_delay�
signal_period�faulthandlerZdump_traceback_later�sysZ
__stderr__�r
rrr�setUp6s��zEINTRBaseTest.setUpcCst�tjdd�dS�Nr)rrrrrrr�
stop_alarmA�zEINTRBaseTest.stop_alarmcCs$|��t�tj|j�t��dS�N)rrrrrZcancel_dump_traceback_laterrrrr�tearDownEszEINTRBaseTest.tearDowncOs tjdf|}tj|fi|��S)Nz-c)rZ
executable�
subprocessZPopen)r
�argsZkwZcmd_argsrrrr J�zEINTRBaseTest.subprocessN)
�__name__�
__module__�__qualname__�__doc__rr�
sleep_timerrZstaticmethodrrr rrrrr	's
r	c@s|eZdZdZdd�Zdd�Zdd�Ze�e	e
d�d	�d
d��Zdd
�Zdd�Z
e�e	e
d�d�dd��Zdd�Zdd�ZdS)�OSEINTRTestz  EINTR tests for the os module. cCsd|j}|�|�S)Nzimport time; time.sleep(%r))r'r )r
�coderrr�new_sleep_processS�
zOSEINTRTest.new_sleep_processcsDd}�fdd�t|�D�}t|�D]
}|�q"|D]}|��q2dS)N�csg|]}����qSr)r*)Z.0�_rrrZ
<listcomp>Y�z3OSEINTRTest._test_wait_multiple.<locals>.<listcomp>)Zrange�wait)r
�	wait_funcZnumZ	processesr-rrrr�_test_wait_multipleWszOSEINTRTest._test_wait_multiplecCs|�tj�dSr)r1�osr/rrrr�	test_wait`rzOSEINTRTest.test_wait�wait3zrequires wait3()cC�|�dd��dS)NcSs
t�d�Sr)r2r4rrrr�<lambda>er.z(OSEINTRTest.test_wait3.<locals>.<lambda>)r1rrrr�
test_wait3crzOSEINTRTest.test_wait3cCs|��}||j�|��dSr)r*�pidr/)r
r0rrrr�_test_wait_singlegs
zOSEINTRTest._test_wait_singlecCr5)NcS�t�|d�Sr)r2Zwaitpid�r8rrrr6nr.z*OSEINTRTest.test_waitpid.<locals>.<lambda>�r9rrrr�test_waitpidmrzOSEINTRTest.test_waitpid�wait4zrequires wait4()cCr5)NcSr:r)r2r>r;rrrr6rr.z(OSEINTRTest.test_wait4.<locals>.<lambda>r<rrrr�
test_wait4przOSEINTRTest.test_wait4cCs�t��\}}|�tj|�gd�}d�dddd|d|jddd	d
df
�}|j|t|�|gd�}t|��Lt�|�|D]}|�	|t�
|t|���qx|�	|��d
�Wd�n1s�0YdS)N)shellosworldsspam�
zimport os, sys, time�zwr = int(sys.argv[1])�
datas = %r�sleep_time = %rzfor data in datas:z$    # let the parent block on read()�    time.sleep(sleep_time)z    os.write(wr, data)�Zpass_fdsr)
r2�pipe�
addCleanup�close�joinr'r �strr�assertEqualZread�lenr/)r
�rd�wr�datasr)r�datarrr�	test_readts*�


zOSEINTRTest.test_readcCs�t��\}}|�tj|�dtj}d�dddd|jdtjddd	d
dddd
ddddddf�}|j|t	|�|gd�}t
|��Xt�|�d}|t|�kr�|t�|t
|�|d��7}q�|�|��d�Wd�n1s�0YdS)N�xr@zimport io, os, sys, timerAzrd = int(sys.argv[1])rCzdata = b"x" * %s�data_len = len(data)z!# let the parent block on write()�time.sleep(sleep_time)zread_data = io.BytesIO()z+while len(read_data.getvalue()) < data_len:z%    chunk = os.read(rd, 2 * data_len)z    read_data.write(chunk)zvalue = read_data.getvalue()zif value != data:z0    raise Exception("read error: %s vs %s bytes"z-                    % (len(value), data_len))rEr)r2rFrGrHrZ
PIPE_MAX_SIZErIr'r rJrrLZwrite�
memoryviewrKr/)r
rMrNrPr)r�writtenrrr�
test_write�s>
�

zOSEINTRTest.test_writeN)r#r$r%r&r*r1r3�unittest�
skipUnless�hasattrr2r7r9r=r?rQrWrrrrr(Os	

r(c@s�eZdZdZe�eed�d�dd��Zdd�Z	e�eejd�d	�d
d��Z
dd
�Zdd�Zdd�Z
e�eejd�d�dd��Zdd�Ze�dd�e�eed�d�dd���Zdd�Ze�ejd kd!�d"d#��Zd$d%�Ze�ejd kd!�d&d'��Zd(S))�SocketEINTRTestz$ EINTR tests for the socket module. �
socketpairzneeds socketpair()c	Cs�t��\}}|�|j�gd�}d�ddddt|j�dt|j�d|d	|jdd
dddd
dddf�}|�	�}|j
|t|�|gd�}t|��H|��|D]}|�
|||t|���q�|�
|��d�Wd�n1s�0YdS)N)rRsyszr@�import os, socket, sys, timerA�fd = int(sys.argv[1])�family = %s�sock_type = %srBrCz)wr = socket.fromfd(fd, family, sock_type)�os.close(fd)zwith wr:z    for data in datas:z(        # let the parent block on recv()z        time.sleep(sleep_time)z        wr.sendall(data)rEr)�socketr\rGrHrI�int�family�typer'�filenor rJrrKrLr/)	r
Z	recv_funcrMrNrOr)�fdrrPrrr�
_test_recv�s8�
zSocketEINTRTest._test_recvcC�|�tjj�dSr)rhrbZrecvrrrr�	test_recv�rzSocketEINTRTest.test_recv�recvmsgzneeds recvmsg()cCr5)NcSs|�|�dSr)rk��sockrPrrrr6�r.z.SocketEINTRTest.test_recvmsg.<locals>.<lambda>)rhrrrr�test_recvmsg�rzSocketEINTRTest.test_recvmsgc
Cs(t��\}}|�|j�dtjd}d�ddddt|j�dt|j	�d	|j
d
tjddddd
dddddddddddddf�}|��}|j|t
|�|gd�}t|��h|��d}|t|�kr�||t|�|d��}	||	dur�t|�n|	7}q�|�|��d�Wd�n1�s0YdS)Nsxyzr,r@r]rAr^r_r`rCzdata = b"xyz" * %srSz)rd = socket.fromfd(fd, family, sock_type)razwith rd:z$    # let the parent block on send()rDz'    received_data = bytearray(data_len)z	    n = 0z    while n < data_len:z8        n += rd.recv_into(memoryview(received_data)[n:])zif received_data != data:z0    raise Exception("recv error: %s vs %s bytes"z5                    % (len(received_data), data_len))rEr)rbr\rGrHrZ
SOCK_MAX_SIZErIrcrdrer'rfr rJrrLrUrKr/)
r
Z	send_funcrMrNrPr)rgrrVZsentrrr�
_test_send�sL�
zSocketEINTRTest._test_sendcCrir)rorbZsendrrrr�	test_sendrzSocketEINTRTest.test_sendcCrir)rorbZsendallrrrr�test_sendallrzSocketEINTRTest.test_sendall�sendmsgzneeds sendmsg()cCr5)NcSs|�|g�Sr)rrrlrrrr6r.z.SocketEINTRTest.test_sendmsg.<locals>.<lambda>)rorrrr�test_sendmsgrzSocketEINTRTest.test_sendmsgcCs�t�tjdf�}|�|j�|��d}d�dddtjd|d|jdd	d
ddf
�}|�	|�}t
|��4|��\}}|��|�|�
�d�Wd�n1s�0YdS)
Nrrr@zimport socket, timerAz	host = %rz	port = %srCz# let parent block on accept()rTz,with socket.create_connection((host, port)):rD)rbZ
create_serverrZHOSTrGrHZgetsocknamerIr'r rZacceptrKr/)r
rmZportr)rZclient_sockr-rrr�test_accepts(�

zSocketEINTRTest.test_accepti
r,�mkfifozneeds mkfifo()cCs�tj}t�|�zt�|�Wn2tyP}z|�d|�WYd}~n
d}~00|�tj|�d�ddd|d|j	dddd|f	�}|�
|�}t|��(||�|�|�
�d	�Wd�n1s�0YdS)
Nzos.mkfifo(): %sr@�import os, timerAz	path = %arCz# let the parent blockrTr)r�TESTFN�unlinkr2ruZPermissionErrorZskipTestrGrIr'r rrKr/)r
Zdo_open_close_readerZdo_open_close_writerZfilenameZer)rrrr�
_test_open;s,
$�

zSocketEINTRTest._test_opencCst|d�}|��dS)NZw)�openrH)r
�pathZfprrr�python_openZr+zSocketEINTRTest.python_open�darwinz+hangs under macOS; see bpo-25234, bpo-35363cC�|�d|j�dS)Nzfp = open(path, 'r')
fp.close())ryr|rrrr�	test_open^��zSocketEINTRTest.test_opencCst�|tj�}t�|�dSr)r2rzZO_WRONLYrH)r
r{rgrrr�os_opendr"zSocketEINTRTest.os_opencCr~)Nz,fd = os.open(path, os.O_RDONLY)
os.close(fd))ryr�rrrr�test_os_openhr�zSocketEINTRTest.test_os_openN)r#r$r%r&rXrYrZrbrhrjrnrorprqrsrtrZrequires_freebsd_versionr2ryr|�skipIfr�platformrr�r�rrrrr[�s2
#
.

�
�r[c@seZdZdZdd�ZdS)�
TimeEINTRTestz" EINTR tests for the time module. cCs:t��}t�|j�|��t��|}|�||j�dSr)�time�	monotonic�sleepr'r�assertGreaterEqual�r
�t0�dtrrr�
test_sleepss
zTimeEINTRTest.test_sleepN)r#r$r%r&r�rrrrr�osr��pthread_sigmaskzneed signal.pthread_sigmask()c@sLeZdZdZdd�Ze�eed�d�dd��Z	e�eed�d�d	d
��Z
dS)�SignalEINTRTestz$ EINTR tests for the signal module. c
Cs�tj}t��}t�|dd��}|�tj||�d�ddt��dt|�d|jdd	f�}t�tj	|g�}|�tjtj
|g�t��}|�
|�}t|��$||�t��|}	Wd�n1s�0Y|�|��d
�dS)NcWsdSrr)r!rrrr6�r.z/SignalEINTRTest.check_sigwait.<locals>.<lambda>r@rvzpid = %szsignum = %srCrTzos.kill(pid, signum)r)rZSIGUSR1r2ZgetpidrGrIrcr'r�Z	SIG_BLOCKZSIG_UNBLOCKr�r�r rrKr/)
r
r0rr8Zold_handlerr)Zold_maskr�rr�rrr�
check_sigwait�s(

�	

*zSignalEINTRTest.check_sigwait�sigwaitinfozneed signal.sigwaitinfo()cC�dd�}|�|�dS)NcSst�|g�dSr)rr��rrrrr0�rz3SignalEINTRTest.test_sigwaitinfo.<locals>.wait_func�r��r
r0rrr�test_sigwaitinfo��z SignalEINTRTest.test_sigwaitinfo�sigtimedwaitcCr�)NcSst�|gd�dS)Ng^@)rr�r�rrrr0�rz4SignalEINTRTest.test_sigtimedwait.<locals>.wait_funcr�r�rrr�test_sigtimedwait�r�z!SignalEINTRTest.test_sigtimedwaitN)r#r$r%r&r�rXrYrZrr�r�rrrrr�{s�
�r�c@s�eZdZdZdd�Ze�ejdkd�e�	e
ed�d�dd	���Ze�	e
ed
�d�dd
��Z
e�	e
ed�d�dd��Ze�	e
ed�d�dd��ZdS)�SelectEINTRTestz$ EINTR tests for the select module. cCs@t��}t�ggg|j�t��|}|��|�||j�dSr)r�r��selectr'rr�r�rrr�test_select�s
zSelectEINTRTest.test_selectr}z(poll may fail on macOS; see issue #28087�pollzneed select.pollcCsFt��}t��}|�|jd�t��|}|��|�||j�dS�Ng@�@)r�r�r�r�r'rr��r
Zpollerr�r�rrr�	test_poll�szSelectEINTRTest.test_poll�epollzneed select.epollcCsNt��}|�|j�t��}|�|j�t��|}|��|�	||j�dSr)
r�r�rGrHr�r�r�r'rr�r�rrr�
test_epoll�szSelectEINTRTest.test_epoll�kqueuezneed select.kqueuecCsRt��}|�|j�t��}|�dd|j�t��|}|��|�	||j�dSr
)
r�r�rGrHr�r�Zcontrolr'rr�)r
r�r�r�rrr�test_kqueue��zSelectEINTRTest.test_kqueue�devpollzneed select.devpollcCsRt��}|�|j�t��}|�|jd�t��|}|��|�	||j�dSr�)
r�r�rGrHr�r�r�r'rr�r�rrr�test_devpoll�r�zSelectEINTRTest.test_devpollN)r#r$r%r&r�rXr�rr�rYrZr�r�r�r�r�rrrrr��s�	



r�c@s8eZdZdd�Ze�e��dkd�dd��Zdd�Z	d	S)
�
FNTLEINTRTestc
Cs:|�tjtj�d�ddtjd|d|jf�}t��}|�|�}t	|���t
tjd���}t��|}|dkr|td|��z,||tj
tjB�||tj�t�d	�Wq\ty�Yq�Yq\0||tj
�t��|}|�||j�|��Wd�n1�s0Y|��Wd�n1�s,0YdS)
Nr@zimport fcntl, timezwith open('%s', 'wb') as f:z   fcntl.%s(f, fcntl.LOCK_EX)z   time.sleep(%s)ZwbgN@z failed to sync child in %.1f secg{�G�z�?)rGrrxrwrIr'r�r�r rrzZ	Exception�fcntlZLOCK_EXZLOCK_NBZLOCK_UNr�ZBlockingIOErrorr�rr/)r
Z	lock_funcZ	lock_namer)Z
start_timerZfr�rrr�_lock�s2�


(zFNTLEINTRTest._lockZAIXzAIX returns PermissionErrorcC�|�tjd�dS)N�lockf)r�r�r�rrrr�
test_lockf	rzFNTLEINTRTest.test_lockfcCr�)N�flock)r�r�r�rrrr�
test_flock
rzFNTLEINTRTest.test_flockN)
r#r$r%r�rXr�r�Zsystemr�r�rrrrr��s 
r�Z__main__) r&Z
contextlibrr�r2r�r�rrbr rr�rXZtestrZtest.supportrrZcontextmanagerrrYrZZTestCaser	r(r[r�r�r�r�r#ZmainrrrrZ<module>sH


'h7�/8)